class Limited
Limited concurrency limiter that enforces a strict task limit.
This implements a counting semaphore that limits the number of concurrent operations. It coordinates with timing strategies to provide both concurrency and rate limiting.
The Limited limiter uses a mutex and condition variable for thread-safe coordination, with support for deadline-aware timeout handling.
Definitions
def initialize(limit = 1, **options)
Initialize a limited concurrency limiter.
Signature
-
parameter
limitInteger Maximum concurrent tasks allowed.
-
parameter
optionsHash Options passed to
Async::Limiter::Generic#initialize.-
raises
ArgumentError If limit is not positive.
Implementation
def initialize(limit = 1, **options)
super(**options)
@limit = limit
@count = 0
@available = ConditionVariable.new
@acquired_count_metric = @utilization.metric(:acquired_count)
@available_count_metric = @utilization.metric(:available_count)
@waiting_count_metric = @utilization.metric(:waiting_count)
@reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count)
update_utilization_metrics
end
attr_reader :limit
Signature
-
attribute
Integer The maximum number of concurrent tasks.
attr_reader :count
Signature
-
attribute
Integer Current count of active tasks.
def acquired_count
Signature
-
returns
Integer Current count of active tasks.
Implementation
def acquired_count
@mutex.synchronize{@count}
end
def available_count
Signature
-
returns
Integer Current count of available capacity.
Implementation
def available_count
@mutex.synchronize{@limit - @count}
end
def waiting_count
Signature
-
returns
Integer Current count of tasks waiting for capacity.
Implementation
def waiting_count
@waiting_count_metric.value
end
def reacquire_waiting_count
Signature
-
returns
Integer Current count of reacquiring tasks waiting for capacity.
Implementation
def reacquire_waiting_count
@reacquire_waiting_count_metric.value
end
def limited?
Check if a new task can be acquired.
Signature
-
returns
Boolean True if under the limit.
Implementation
def limited?
@mutex.synchronize{@count >= @limit}
end
def limit=(new_limit)
Update the concurrency limit.
Signature
-
parameter
new_limitInteger The new maximum number of concurrent tasks.
-
raises
ArgumentError If new_limit is not positive.
Implementation
def limit=(new_limit)
@mutex.synchronize do
old_limit = @limit
@limit = new_limit
update_utilization_metrics
# Wake up waiting tasks if limit increased:
@available.broadcast if new_limit > old_limit
end
end
def statistics
Get current limiter statistics.
Signature
-
returns
Hash Statistics hash with current state.
Implementation
def statistics
@mutex.synchronize do
{
limit: @limit,
count: @count,
acquired_count: @count,
available_count: @limit - @count,
waiting_count: @waiting_count_metric.value,
reacquire_waiting_count: @reacquire_waiting_count_metric.value,
timing: @timing.statistics
}
end
end
def acquire_resource(deadline, reacquire: false, **options)
Acquire resource with optional deadline.
Implementation
def acquire_resource(deadline, reacquire: false, **options)
# Fast path: immediate return for expired deadlines, but only if at capacity
return nil if deadline&.expired? && @count >= @limit
waiting = false
acquired = false
# Wait for capacity with deadline tracking
while @count >= @limit
remaining = deadline&.remaining
return nil if remaining && remaining <= 0
unless waiting
@waiting_count_metric.increment
@reacquire_waiting_count_metric.increment if reacquire
waiting = true
end
unless @available.wait(@mutex, remaining)
return nil # Timeout exceeded
end
end
@count += 1
acquired = true
return true
ensure
if waiting
@waiting_count_metric.decrement
@reacquire_waiting_count_metric.decrement if reacquire
end
update_utilization_metrics if acquired
end
def release_resource(resource)
Release resource.
Implementation
def release_resource(resource)
@mutex.synchronize do
@count -= 1
update_utilization_metrics
@available.signal
end
end