class Queued
Queue-based limiter that distributes pre-existing resources with priority/timeout support.
This limiter manages a finite set of resources (connections, API keys, etc.) that are pre-populated in a queue. It provides priority-based allocation and timeout support for resource acquisition.
Unlike Limited which counts abstract permits, Queued distributes actual resource objects and supports priority queues for fair allocation.
Definitions
def self.default_queue
Signature
-
returns
Queue
A default queue with a single true value.
Implementation
def self.default_queue
Queue.new.tap do |queue|
queue.push(true)
end
end
def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil)
Initialize a queue-based limiter.
Signature
-
parameter
queue
#push, #pop, #empty?
Thread-safe queue containing pre-existing resources.
-
parameter
timing
#acquire, #wait, #maximum_cost
Strategy for timing constraints.
-
parameter
parent
Async::Task, nil
Parent task for creating child tasks.
Implementation
def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil)
super(timing: timing, parent: parent)
@queue = queue
end
attr_reader :queue
Signature
-
attribute
Queue
The queue managing resources.
def limited?
Check if a new task can be acquired.
Signature
-
returns
Boolean
True if resources are available.
Implementation
def limited?
@queue.empty?
end
def expand(count, value = true)
Expand the queue with additional resources.
Signature
-
parameter
count
Integer
Number of resources to add.
-
parameter
value
Object
The value to add to the queue.
Implementation
def expand(count, value = true)
count.times do
@queue.push(value)
end
end
def statistics
Get current limiter statistics.
Signature
-
returns
Hash
Statistics hash with current state.
Implementation
def statistics
@mutex.synchronize do
{
waiting: @queue.waiting_count,
available: @queue.size,
timing: @timing.statistics
}
end
end
def acquire_resource(deadline, reacquire: false, **options)
Acquire a resource from the queue with optional deadline.
Implementation
def acquire_resource(deadline, reacquire: false, **options)
@mutex.unlock
return @queue.pop(timeout: deadline&.remaining, **options)
ensure
@mutex.lock
end
def release_resource(value)
Release a previously acquired resource back to the queue.
Implementation
def release_resource(value)
# Return a default resource to the queue:
@queue.push(value)
end