class Controller
A resource pool controller.
Nested
Definitions
def self.wrap(**options, &block)
Create a new resource pool, using the given block to create new resources.
Implementation
def self.wrap(**options, &block)
self.new(block, **options)
end
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
Create a new resource pool.
Signature
-
parameter
constructor
Proc
A block which creates a new resource.
-
parameter
limit
Integer | Nil
The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.
-
parameter
concurrency
Integer
The maximum number of concurrent tasks that can be creating a new resource.
-
parameter
policy
Policy
The pool policy.
Implementation
def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
@constructor = constructor
@limit = limit
# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
@guard = Async::Semaphore.new(concurrency)
@policy = policy
@gardener = nil
@tags = tags
# All available resources:
@resources = {}
# Resources which may be available to be acquired:
# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
@available = []
# Used to signal when a resource has been released:
@notification = Async::Notification.new
end
attr :constructor
Signature
-
attribute
Proc
The constructor used to create new resources.
attr_accessor :limit
Signature
-
attribute
Integer
The maximum number of resources that this pool can have at any given time.
def to_s
Generate a human-readable representation of the pool.
Implementation
def to_s
if @resources.empty?
"\#<#{self.class}(#{usage_string})>"
else
"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
end
end
def as_json(...)
Generate a JSON representation of the pool.
Implementation
def as_json(...)
{
limit: @limit,
concurrency: @guard.limit,
usage: @resources.size,
availability_summary: self.availability_summary,
}
end
def to_json(...)
Generate a JSON representation of the pool.
Implementation
def to_json(...)
as_json.to_json(...)
end
def concurrency
Signature
-
attribute
Integer
The maximum number of concurrent tasks that can be creating a new resource.
Implementation
def concurrency
@guard.limit
end
def concurrency= value
Set the maximum number of concurrent tasks that can be creating a new resource.
Implementation
def concurrency= value
@guard.limit = value
end
attr_accessor :policy
Signature
-
attribute
Policy
The pool policy.
attr :resources
Signature
-
attribute
Hash(Resource, Integer)
all allocated resources, and their associated usage.
def size
The number of resources in the pool.
Implementation
def size
@resources.size
end
def active?
Whether the pool has any active resources.
Implementation
def active?
!@resources.empty?
end
def busy?
Whether there are resources which are currently in use.
Implementation
def busy?
@resources.collect do |_, usage|
return true if usage > 0
end
return false
end
def available?
Whether there are available resources, i.e. whether #acquire
can reuse an existing resource.
Implementation
def available?
@available.any?
end
def wait
Wait until a pool resource has been freed.
Implementation
def wait
@notification.wait
end
def empty?
Whether the pool is empty.
Implementation
def empty?
@resources.empty?
end
def acquire
Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
Implementation
def acquire
resource = wait_for_resource
return resource unless block_given?
begin
yield resource
ensure
release(resource)
end
end
def release(resource)
Make the resource resources and let waiting tasks know that there is something resources.
Implementation
def release(resource)
processed = false
# A resource that is not good should also not be reusable.
if resource.reusable?
processed = reuse(resource)
end
# @policy.released(self, resource)
ensure
retire(resource) unless processed
end
def close
Close all resources in the pool.
Implementation
def close
self.drain
@available.clear
@gardener&.stop
end
def prune(retain = 0)
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
Signature
-
parameter
retain
Integer
the minimum number of resources to retain.
-
yields
{|resource| ...}
Any unused resource.
Implementation
def prune(retain = 0)
unused = []
# This code must not context switch:
@resources.each do |resource, usage|
if usage.zero?
unused << resource
end
end
# It's okay for this to context switch:
unused.each do |resource|
if block_given?
yield resource
else
retire(resource)
end
break if @resources.size <= retain
end
# Update availability list:
@available.clear
@resources.each do |resource, usage|
if usage < resource.concurrency and resource.reusable?
@available << resource
end
end
return unused.size
end
def retire(resource)
Retire a specific resource.
Implementation
def retire(resource)
Console.debug(self) {"Retire #{resource}"}
@resources.delete(resource)
resource.close
@notification.signal
return true
end
def create_resource
Signature
-
returns
Object
A new resource in a "used" state.
Implementation
def create_resource
self.start_gardener
# This might return nil, which means creating the resource failed.
if resource = @constructor.call
@resources[resource] = 1
# Make the resource available if it can be used multiple times:
if resource.concurrency > 1
@available.push(resource)
end
end
# @policy.created(self, resource)
return resource
end
def available_resource
Signature
-
returns
Object
An existing resource in a "used" state.
Implementation
def available_resource
resource = nil
@guard.acquire do
resource = acquire_or_create_resource
end
return resource
rescue Exception => error
reuse(resource) if resource
raise
end
def acquire_existing_resource
Acquire an existing resource with zero usage. If there are resources that are in use, wait until they are released.
Implementation
def acquire_existing_resource
while @resources.any?
@resources.each do |resource, usage|
if usage == 0
return resource
end
end
@notification.wait
end
# Only when the pool has been completely drained, return nil:
return nil
end