class Generic
Generic limiter class with unlimited concurrency by default.
This provides the foundation for rate limiting and concurrency control. Subclasses can override methods to implement specific limiting behaviors.
The Generic limiter coordinates timing strategies with concurrency control, providing thread-safe acquisition with deadline tracking and cost-based consumption.
Definitions
def initialize(timing: Timing::None, parent: nil, tags: nil)
Initialize a new generic limiter.
Signature
-
parameter
timing
#acquire, #wait, #maximum_cost
Strategy for timing constraints.
-
parameter
parent
Async::Task, nil
Parent task for creating child tasks.
Implementation
def initialize(timing: Timing::None, parent: nil, tags: nil)
@timing = timing
@parent = parent
@tags = tags
@mutex = Mutex.new
end
def limited?
Signature
-
returns
Boolean
Whether this limiter is currently limiting concurrency.
Implementation
def limited?
false
end
def async(parent: (@parent || Task.current), **options)
- asynchronous
Execute a task asynchronously with unlimited concurrency.
Signature
-
parameter
parent
Async::Task
Parent task for the new task.
-
parameter
options
Hash
Additional options passed to
Async::Task#async
.-
yields
{|task| ...}
The block to execute within the limiter constraints.
-
parameter
task
Async::Task
The async task context.
-
parameter
-
returns
Async::Task
The created task.
- asynchronous
Implementation
def async(parent: (@parent || Task.current), **options)
acquire
parent.async(**options) do |task|
yield task
ensure
release
end
end
def sync
- asynchronous
Execute a task synchronously with unlimited concurrency.
Signature
-
yields
{|task| ...}
The block to execute within the limiter constraints.
-
parameter
task
Async::Task
The current task context.
-
parameter
- asynchronous
Implementation
def sync
acquire do
yield(Task.current)
end
end
def acquire(timeout: nil, cost: 1, **options)
- asynchronous
Manually acquire a resource with timing and concurrency coordination.
This method provides the core acquisition logic with support for:
- Flexible timeout handling (blocking, non-blocking, timed)
- Cost-based consumption for timing strategies
- Deadline tracking to prevent timeout violations
- Automatic resource cleanup with block usage
Signature
-
parameter
timeout
Numeric, nil
Timeout in seconds (nil = wait forever, 0 = non-blocking).
-
parameter
cost
Numeric
The cost/weight of this operation for timing strategies (default: 1).
-
parameter
options
Hash
Additional options passed to concurrency acquisition.
-
yields
{|resource| ...}
Optional block executed with automatic resource release.
-
parameter
resource
Object
The acquired resource.
-
parameter
-
returns
Object, nil
The acquired resource, or nil if acquisition failed/timed out. When used with a block, returns the result of the block execution.
-
raises
ArgumentError
If cost exceeds the timing strategy's maximum supported cost.
- asynchronous
Implementation
def acquire(timeout: nil, cost: 1, **options)
# Validate cost against timing strategy capacity:
maximum_cost = @timing.maximum_cost
if cost > maximum_cost
raise ArgumentError, "Cost #{cost} exceeds maximum supported cost #{maximum_cost} for timing strategy!"
end
resource = nil
deadline = Deadline.start(timeout)
# Atomically handle timing constraints and concurrency logic:
acquire_synchronized(timeout, cost, **options) do
# Wait for timing constraints to be satisfied (mutex released during sleep)
return nil unless @timing.wait(@mutex, deadline, cost)
# Execute the concurrency-specific acquisition logic
resource = acquire_resource(deadline, **options)
# Record timing acquisition if successful
if resource
@timing.acquire(cost)
else
# `acquire_concurrency` should return nil if deadline reached:
return nil
end
resource
end
return resource unless block_given?
begin
yield(resource)
ensure
release(resource)
end
end
def release(resource = true)
Release a previously acquired resource.
Implementation
def release(resource = true)
release_resource(resource)
end
def statistics
Get current limiter statistics.
Signature
-
returns
Hash
Statistics hash with current state.
Implementation
def statistics
@mutex.synchronize do
{
timing: @timing.statistics
}
end
end
def acquire_resource(deadline, **options)
Default resource acquisition for unlimited semaphore. Subclasses should override this method.
Implementation
def acquire_resource(deadline, **options)
# Default unlimited behavior - always succeeds
true
end
def release_resource(resource)
Default resource release for unlimited semaphore. Subclasses should override this method.
Implementation
def release_resource(resource)
# Default implementation - subclasses should override.
end