Async::PoolSourceAsyncPoolController

class Controller

A resource pool controller.

Nested

Definitions

def self.wrap(**options, &block)

Create a new resource pool, using the given block to create new resources.

Implementation

def self.wrap(**options, &block)
	self.new(block, **options)
end

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)

Create a new resource pool.

Signature

parameter constructor Proc

A block which creates a new resource.

parameter limit Integer | Nil

The maximum number of resources that this pool can have at any given time. If nil, the pool can have an unlimited number of resources.

parameter concurrency Integer

The maximum number of concurrent tasks that can be creating a new resource.

parameter policy Policy

The pool policy.

Implementation

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil)
	@constructor = constructor
	@limit = limit
	
	# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
	@guard = Async::Semaphore.new(concurrency)
	
	@policy = policy
	@gardener = nil
	
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	# Used to signal when a resource has been released:
	@notification = Async::Notification.new
end

attr :constructor

Signature

attribute Proc

The constructor used to create new resources.

attr_accessor :limit

Signature

attribute Integer

The maximum number of resources that this pool can have at any given time.

def to_s

Generate a human-readable representation of the pool.

Implementation

def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
	end
end

def as_json(...)

Generate a JSON representation of the pool.

Implementation

def as_json(...)
	{
		limit: @limit,
		concurrency: @guard.limit,
		usage: @resources.size,
		availability_summary: self.availability_summary,
	}
end

def to_json(...)

Generate a JSON representation of the pool.

Implementation

def to_json(...)
	as_json.to_json(...)
end

def concurrency

Signature

attribute Integer

The maximum number of concurrent tasks that can be creating a new resource.

Implementation

def concurrency
	@guard.limit
end

def concurrency= value

Set the maximum number of concurrent tasks that can be creating a new resource.

Implementation

def concurrency= value
	@guard.limit = value
end

attr_accessor :policy

Signature

attribute Policy

The pool policy.

attr :resources

Signature

attribute Hash(Resource, Integer)

all allocated resources, and their associated usage.

def size

The number of resources in the pool.

Implementation

def size
	@resources.size
end

def active?

Whether the pool has any active resources.

Implementation

def active?
	!@resources.empty?
end

def busy?

Whether there are resources which are currently in use.

Implementation

def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

def available?

Whether there are available resources, i.e. whether #acquire can reuse an existing resource.

Implementation

def available?
	@available.any?
end

def wait

Wait until a pool resource has been freed.

Implementation

def wait
	@notification.wait
end

def empty?

Whether the pool is empty.

Implementation

def empty?
	@resources.empty?
end

def acquire

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.

Implementation

def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

def release(resource)

Make the resource resources and let waiting tasks know that there is something resources.

Implementation

def release(resource)
	processed = false
	
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		processed = reuse(resource)
	end
	
	# @policy.released(self, resource)
ensure
	retire(resource) unless processed
end

def close

Close all resources in the pool.

Implementation

def close
	self.drain
	
	@available.clear
	@gardener&.stop
end

def prune(retain = 0)

Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.

Signature

parameter retain Integer

the minimum number of resources to retain.

yields {|resource| ...}

Any unused resource.

Implementation

def prune(retain = 0)
	unused = []
	
	# This code must not context switch:
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	# It's okay for this to context switch:
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

def retire(resource)

Retire a specific resource.

Implementation

def retire(resource)
	Console.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
	
	return true
end

def create_resource

Signature

returns Object

A new resource in a "used" state.

Implementation

def create_resource
	self.start_gardener
	
	# This might return nil, which means creating the resource failed.
	if resource = @constructor.call
		@resources[resource] = 1
		
		# Make the resource available if it can be used multiple times:
		if resource.concurrency > 1
			@available.push(resource)
		end
	end
	
	# @policy.created(self, resource)
	
	return resource
end

def available_resource

Signature

returns Object

An existing resource in a "used" state.

Implementation

def available_resource
	resource = nil
	
	Console.debug(self, "Acquiring concurrency guard...", blocking: @guard.blocking?)
	
	@guard.acquire do
		Console.debug(self, "Acquired concurrency guard.")
		
		resource = acquire_or_create_resource
	end
	
	return resource
rescue Exception => error
	reuse(resource) if resource
	raise
end

def acquire_existing_resource

Acquire an existing resource with zero usage. If there are resources that are in use, wait until they are released.

Implementation

def acquire_existing_resource
	while @resources.any?
		@resources.each do |resource, usage|
			if usage == 0
				return resource
			end
		end
		
		@notification.wait
	end
	
	# Only when the pool has been completely drained, return nil:
	return nil
end