class Limited
Limited concurrency limiter that enforces a strict task limit.
This implements a counting semaphore that limits the number of concurrent operations. It coordinates with timing strategies to provide both concurrency and rate limiting.
The Limited limiter uses a mutex and condition variable for thread-safe coordination, with support for deadline-aware timeout handling.
Definitions
def initialize(limit = 1, timing: Timing::None, parent: nil)
Initialize a limited concurrency limiter.
Signature
-
parameter
limit
Integer
Maximum concurrent tasks allowed.
-
parameter
timing
#acquire, #wait, #maximum_cost
Strategy for timing constraints.
-
parameter
parent
Async::Task, nil
Parent task for creating child tasks.
-
raises
ArgumentError
If limit is not positive.
Implementation
def initialize(limit = 1, timing: Timing::None, parent: nil)
super(timing: timing, parent: parent)
@limit = limit
@count = 0
@available = ConditionVariable.new
end
attr_reader :limit
Signature
-
attribute
Integer
The maximum number of concurrent tasks.
attr_reader :count
Signature
-
attribute
Integer
Current count of active tasks.
def limited?
Check if a new task can be acquired.
Signature
-
returns
Boolean
True if under the limit.
Implementation
def limited?
@mutex.synchronize{@count >= @limit}
end
def limit=(new_limit)
Update the concurrency limit.
Signature
-
parameter
new_limit
Integer
The new maximum number of concurrent tasks.
-
raises
ArgumentError
If new_limit is not positive.
Implementation
def limit=(new_limit)
@mutex.synchronize do
old_limit = @limit
@limit = new_limit
# Wake up waiting tasks if limit increased:
@available.broadcast if new_limit > old_limit
end
end
def statistics
Get current limiter statistics.
Signature
-
returns
Hash
Statistics hash with current state.
Implementation
def statistics
@mutex.synchronize do
{
limit: @limit,
count: @count,
timing: @timing.statistics
}
end
end
def acquire_resource(deadline, **options)
Acquire resource with optional deadline.
Implementation
def acquire_resource(deadline, **options)
# Fast path: immediate return for expired deadlines, but only if at capacity
return nil if deadline&.expired? && @count >= @limit
# Wait for capacity with deadline tracking
while @count >= @limit
remaining = deadline&.remaining
return nil if remaining && remaining <= 0
unless @available.wait(@mutex, remaining)
return nil # Timeout exceeded
end
end
@count += 1
return true
end
def release_resource(resource)
Release resource.
Implementation
def release_resource(resource)
@mutex.synchronize do
@count -= 1
@available.signal
end
end