class Queued
Queue-based limiter that distributes pre-existing resources with priority/timeout support.
This limiter manages a finite set of resources (connections, API keys, etc.) that are pre-populated in a queue. It provides priority-based allocation and timeout support for resource acquisition.
Unlike Limited which counts abstract permits, Queued distributes actual resource objects and supports priority queues for fair allocation.
Definitions
def self.default_queue
Signature
-
returns
Queue A default queue with a single true value.
Implementation
def self.default_queue
Queue.new.tap do |queue|
queue.push(true)
end
end
def initialize(queue = self.class.default_queue, **options)
Initialize a queue-based limiter.
Signature
-
parameter
queue#push, #pop, #empty? Thread-safe queue containing pre-existing resources.
-
parameter
optionsHash Options passed to
Async::Limiter::Generic#initialize.
Implementation
def initialize(queue = self.class.default_queue, **options)
super(**options)
@queue = queue
@acquired_count_metric = @utilization.metric(:acquired_count)
@available_count_metric = @utilization.metric(:available_count)
@waiting_count_metric = @utilization.metric(:waiting_count)
@reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count)
update_utilization_metrics
end
attr_reader :queue
Signature
-
attribute
Queue The queue managing resources.
def acquired_count
Signature
-
returns
Integer Current count of acquired resources.
Implementation
def acquired_count
@acquired_count_metric.value
end
def available_count
Signature
-
returns
Integer Current count of available resources.
Implementation
def available_count
@queue.size
end
def waiting_count
Signature
-
returns
Integer Current count of tasks waiting for resources.
Implementation
def waiting_count
@queue.waiting_count
end
def reacquire_waiting_count
Signature
-
returns
Integer Current count of reacquiring tasks waiting for resources.
Implementation
def reacquire_waiting_count
@reacquire_waiting_count_metric.value
end
def limited?
Check if a new task can be acquired.
Signature
-
returns
Boolean True if resources are available.
Implementation
def limited?
@queue.empty?
end
def expand(count, value = true)
Expand the queue with additional resources.
Signature
-
parameter
countInteger Number of resources to add.
-
parameter
valueObject The value to add to the queue.
Implementation
def expand(count, value = true)
count.times do
@queue.push(value)
end
update_utilization_metrics
end
def statistics
Get current limiter statistics.
Signature
-
returns
Hash Statistics hash with current state.
Implementation
def statistics
@mutex.synchronize do
{
waiting: @queue.waiting_count,
available: @queue.size,
acquired_count: @acquired_count_metric.value,
available_count: @queue.size,
waiting_count: @queue.waiting_count,
reacquire_waiting_count: @reacquire_waiting_count_metric.value,
timing: @timing.statistics
}
end
end
def acquire_resource(deadline, reacquire: false, **options)
Acquire a resource from the queue with optional deadline.
Implementation
def acquire_resource(deadline, reacquire: false, **options)
@reacquire_waiting_count_metric.increment if reacquire
update_utilization_metrics if reacquire
@mutex.unlock
resource = @queue.pop(timeout: deadline&.remaining, **options)
return resource
ensure
@mutex.lock
@reacquire_waiting_count_metric.decrement if reacquire
@acquired_count_metric.increment if resource
update_utilization_metrics if reacquire || resource
end
def release_resource(value)
Release a previously acquired resource back to the queue.
Implementation
def release_resource(value)
@mutex.synchronize do
@acquired_count_metric.decrement if @acquired_count_metric.value > 0
update_utilization_metrics
end
# Return a default resource to the queue:
@queue.push(value)
update_utilization_metrics
end