Async LimiterSourceAsyncLimiterQueued

class Queued

Queue-based limiter that distributes pre-existing resources with priority/timeout support.

This limiter manages a finite set of resources (connections, API keys, etc.) that are pre-populated in a queue. It provides priority-based allocation and timeout support for resource acquisition.

Unlike Limited which counts abstract permits, Queued distributes actual resource objects and supports priority queues for fair allocation.

Definitions

def self.default_queue

Signature

returns Queue

A default queue with a single true value.

Implementation

def self.default_queue
	Queue.new.tap do |queue|
		queue.push(true)
	end
end

def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil)

Initialize a queue-based limiter.

Signature

parameter queue #push, #pop, #empty?

Thread-safe queue containing pre-existing resources.

parameter timing #acquire, #wait, #maximum_cost

Strategy for timing constraints.

parameter parent Async::Task, nil

Parent task for creating child tasks.

Implementation

def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil)
	super(timing: timing, parent: parent)
	@queue = queue
end

attr_reader :queue

Signature

attribute Queue

The queue managing resources.

def limited?

Check if a new task can be acquired.

Signature

returns Boolean

True if resources are available.

Implementation

def limited?
	@queue.empty?
end

def expand(count, value = true)

Expand the queue with additional resources.

Signature

parameter count Integer

Number of resources to add.

parameter value Object

The value to add to the queue.

Implementation

def expand(count, value = true)
	count.times do
		@queue.push(value)
	end
end

def statistics

Get current limiter statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	@mutex.synchronize do
		{
			waiting: @queue.waiting_count,
			available: @queue.size,
			timing: @timing.statistics
		}
	end
end

def acquire_resource(deadline, reacquire: false, **options)

Acquire a resource from the queue with optional deadline.

Implementation

def acquire_resource(deadline, reacquire: false, **options)
	@mutex.unlock
	return @queue.pop(timeout: deadline&.remaining, **options)
ensure
	@mutex.lock
end

def release_resource(value)

Release a previously acquired resource back to the queue.

Implementation

def release_resource(value)
	# Return a default resource to the queue:
	@queue.push(value)
end