Async LimiterSourceAsyncLimiterQueued

class Queued

Queue-based limiter that distributes pre-existing resources with priority/timeout support.

This limiter manages a finite set of resources (connections, API keys, etc.) that are pre-populated in a queue. It provides priority-based allocation and timeout support for resource acquisition.

Unlike Limited which counts abstract permits, Queued distributes actual resource objects and supports priority queues for fair allocation.

Definitions

def self.default_queue

Signature

returns Queue

A default queue with a single true value.

Implementation

def self.default_queue
	Queue.new.tap do |queue|
		queue.push(true)
	end
end

def initialize(queue = self.class.default_queue, **options)

Initialize a queue-based limiter.

Signature

parameter queue #push, #pop, #empty?

Thread-safe queue containing pre-existing resources.

parameter options Hash

Options passed to Async::Limiter::Generic#initialize.

Implementation

def initialize(queue = self.class.default_queue, **options)
	super(**options)
	@queue = queue
	
	@acquired_count_metric = @utilization.metric(:acquired_count)
	@available_count_metric = @utilization.metric(:available_count)
	@waiting_count_metric = @utilization.metric(:waiting_count)
	@reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count)
	
	update_utilization_metrics
end

attr_reader :queue

Signature

attribute Queue

The queue managing resources.

def acquired_count

Signature

returns Integer

Current count of acquired resources.

Implementation

def acquired_count
	@acquired_count_metric.value
end

def available_count

Signature

returns Integer

Current count of available resources.

Implementation

def available_count
	@queue.size
end

def waiting_count

Signature

returns Integer

Current count of tasks waiting for resources.

Implementation

def waiting_count
	@queue.waiting_count
end

def reacquire_waiting_count

Signature

returns Integer

Current count of reacquiring tasks waiting for resources.

Implementation

def reacquire_waiting_count
	@reacquire_waiting_count_metric.value
end

def limited?

Check if a new task can be acquired.

Signature

returns Boolean

True if resources are available.

Implementation

def limited?
	@queue.empty?
end

def expand(count, value = true)

Expand the queue with additional resources.

Signature

parameter count Integer

Number of resources to add.

parameter value Object

The value to add to the queue.

Implementation

def expand(count, value = true)
	count.times do
		@queue.push(value)
	end
	
	update_utilization_metrics
end

def statistics

Get current limiter statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	@mutex.synchronize do
		{
			waiting: @queue.waiting_count,
			available: @queue.size,
			acquired_count: @acquired_count_metric.value,
			available_count: @queue.size,
			waiting_count: @queue.waiting_count,
			reacquire_waiting_count: @reacquire_waiting_count_metric.value,
			timing: @timing.statistics
		}
	end
end

def acquire_resource(deadline, reacquire: false, **options)

Acquire a resource from the queue with optional deadline.

Implementation

def acquire_resource(deadline, reacquire: false, **options)
	@reacquire_waiting_count_metric.increment if reacquire
	update_utilization_metrics if reacquire
	
	@mutex.unlock
	resource = @queue.pop(timeout: deadline&.remaining, **options)
	return resource
ensure
	@mutex.lock
	@reacquire_waiting_count_metric.decrement if reacquire
	@acquired_count_metric.increment if resource
	update_utilization_metrics if reacquire || resource
end

def release_resource(value)

Release a previously acquired resource back to the queue.

Implementation

def release_resource(value)
	@mutex.synchronize do
		@acquired_count_metric.decrement if @acquired_count_metric.value > 0
		update_utilization_metrics
	end
	
	# Return a default resource to the queue:
	@queue.push(value)
	update_utilization_metrics
end