Limited Limiter
This guide explains the class Async::Limiter::Limited
class, which provides semaphore-style concurrency control, enforcing a maximum number of concurrent operations. It's perfect for controlling concurrency when you have limited capacity or want to prevent system overload.
Usage
Limit the number of concurrent tasks:
require "async"
require "async/limiter"
Async do
# Maximum 2 concurrent tasks
limiter = Async::Limiter::Limited.new(2)
4.times do |i|
limiter.async do |task|
puts "Task #{i} started at #{Time.now}"
task.sleep 1
puts "Task #{i} finished at #{Time.now}"
end
end
end
# Output shows tasks 0,1 run first, then tasks 2,3
# Total duration: ~2 seconds instead of ~1 second
Block-Based Acquisition
The recommended pattern using automatic cleanup:
limiter = Async::Limiter::Limited.new(1)
# Acquire with automatic release using blocks:
limiter.acquire do |acquired|
puts "I have acquired: #{acquired}"
# Automatically released when block exits.
end
Timeouts
All acquisition methods support comprehensive timeout options:
limiter = Async::Limiter::Limited.new(1)
Async do
# Non-blocking (immediate check) - should succeed:
if limiter.acquire(timeout: 0)
puts "Got acquisition immediately"
else
puts "No capacity available"
end
# Now limiter is at capacity, so subsequent calls will fail/timeout.
# Non-blocking check - will fail since capacity is used:
if limiter.acquire(timeout: 0)
puts "Got second acquisition"
else
puts "No capacity available for second acquisition"
end
# Timed acquisition - will timeout since capacity is still used:
if limiter.acquire(timeout: 0.1)
puts "Got acquisition within timeout"
else
puts "Timed out waiting for capacity"
end
# With blocks (automatic cleanup):
result = limiter.acquire(timeout: 1.0) do |acquired|
"Successfully acquired and used"
end
puts result || "Acquisition timed out"
end
Concurrent Timeout Behavior
The limiter prevents convoy effects where quick timeouts aren't blocked by slow ones:
limiter = Async::Limiter::Limited.new(1)
Async do
limiter.acquire # Fill to capacity.
results = []
# Start multiple tasks with different timeouts:
tasks = [
Async {limiter.acquire(timeout: 1.0); results << "Long timeout."},
Async {limiter.acquire(timeout: 0.1); results << "Short timeout."},
Async {limiter.acquire(timeout: 0); results << "Non-blocking."},
]
# All tasks complete quickly, even with a long timeout task present:
tasks.map(&:wait)
puts results
# => ["Non-blocking.", "Short timeout.", "Long timeout."]
end
Dynamic Limit Adjustment
Adjust limits at runtime based on changing conditions:
limiter = Async::Limiter::Limited.new(2)
puts "Initial limit: #{limiter.limit}" # 2
# Increase capacity during high load
limiter.limit = 5
puts "Increased limit: #{limiter.limit}" # 5
# Decrease capacity during high load
limiter.limit = 1
puts "Decreased limit: #{limiter.limit}" # 1
Cost-Based Operations
Operations can consume multiple "units" based on their computational weight:
# Create limiter with timing strategy that has capacity limits:
timing = Async::Limiter::Timing::LeakyBucket.new(5.0, 10.0) # 5/sec rate, 10 capacity.
limiter = Async::Limiter::Limited.new(100, timing: timing)
Async do
# Light operations (consume 0.5 units):
limiter.acquire(cost: 0.5) do
perform_light_database_query()
end
# Normal operations (default cost: 1.0):
limiter.acquire do
perform_standard_operation()
end
# Heavy operations (consume 3.5 units):
limiter.acquire(cost: 3.5) do
perform_heavy_computation()
end
# Operations exceeding capacity fail fast:
begin
# Exceeds timing capacity of 10.0:
limiter.acquire(cost: 15.0)
rescue ArgumentError => error
puts "#{error.message}"
# => Cost 15.0 exceeds maximum supported cost 10.0
end
end
Cost + Timeout Combinations
When using cost-based operations with timing strategies, be aware that high-cost operations can be starved by continuous low-cost operations. Use class Async::Limiter::Timing::Ordered
to enforce FIFO ordering if fairness is important:
# Default behavior - potential starvation:
timing = Async::Limiter::Timing::LeakyBucket.new(2.0, 10.0)
limiter = Async::Limiter::Limited.new(100, timing: timing)
# High-cost operation might be starved by many small operations:
result = limiter.acquire(timeout: 30.0, cost: 8.0) do |acquired|
expensive_machine_learning_inference()
end
# With FIFO ordering - prevents starvation:
ordered_timing = Async::Limiter::Timing::Ordered.new(timing)
fair_limiter = Async::Limiter::Limited.new(100, timing: ordered_timing)
# High-cost operation is guaranteed to execute in arrival order:
result = fair_limiter.acquire(timeout: 30.0, cost: 8.0) do |acquired|
expensive_machine_learning_inference()
end