class LeakyBucket
Leaky bucket timing strategy that smooths traffic flow.
This strategy maintains a "bucket" that gradually "leaks" capacity over time, enforcing a steady output rate regardless of input bursts.
Definitions
attr_reader :rate
Signature
-
attribute
Numeric
Leak rate in units per second.
attr_reader :capacity
Signature
-
attribute
Numeric
Maximum bucket capacity.
def initialize(rate, capacity, initial_level: 0)
Initialize a leaky bucket timing strategy.
Signature
-
parameter
rate
Numeric
Leak rate in units per second.
-
parameter
capacity
Numeric
Maximum bucket capacity.
-
parameter
initial_level
Numeric
Starting bucket level (0 = leaky bucket, capacity = token bucket).
Implementation
def initialize(rate, capacity, initial_level: 0)
raise ArgumentError, "rate must be non-negative" unless rate >= 0
raise ArgumentError, "capacity must be positive" unless capacity.positive?
@rate = rate
@capacity = capacity
@level = initial_level.to_f
@last_leak = Clock.now
end
def maximum_cost
Maximum cost this timing strategy can support.
Signature
-
returns
Numeric
The maximum cost (equal to capacity).
Implementation
def maximum_cost
@capacity
end
def can_acquire?(cost = 1, current_time = Clock.now)
Check if a task can be acquired with the given cost.
Signature
-
parameter
cost
Numeric
The cost/weight of the operation.
-
parameter
current_time
Numeric
Current time for leak calculations.
-
returns
Boolean
True if bucket has capacity for this cost.
Implementation
def can_acquire?(cost = 1, current_time = Clock.now)
leak_bucket(current_time)
@level + cost <= @capacity
end
def acquire(cost = 1)
Record that a task was acquired (adds cost to bucket level).
Signature
-
parameter
cost
Numeric
The cost/weight of the operation.
Implementation
def acquire(cost = 1)
leak_bucket
@level += cost
end
def wait(mutex, deadline = nil, cost = 1)
Wait for bucket to have capacity.
Signature
-
parameter
mutex
Mutex
Mutex to release during sleep.
-
parameter
deadline
Deadline, nil
Deadline for timeout (nil = wait forever).
-
parameter
cost
Numeric
Cost of the operation (default: 1).
-
returns
Boolean
True if capacity is available, false if timeout exceeded.
Implementation
def wait(mutex, deadline = nil, cost = 1)
# Loop until we can acquire or deadline expires:
until can_acquire?(cost, Clock.now)
# Check deadline before each wait:
return false if deadline&.expired?
# Calculate how long to wait for bucket to leak enough for this cost:
needed_capacity = (@level + cost) - @capacity
wait_time = needed_capacity / @rate.to_f
# Should be able to acquire now:
return true if wait_time <= 0
# Check if wait would exceed deadline:
remaining = deadline&.remaining
if remaining && wait_time > remaining
# Would exceed deadline:
return false
end
# Wait for the required time (or remaining time if deadline specified):
actual_wait = remaining ? [wait_time, remaining].min : wait_time
# Release mutex during sleep:
mutex.sleep(actual_wait)
end
return true
end
def level
Get current bucket level (for debugging/monitoring).
Signature
-
returns
Numeric
Current bucket level.
Implementation
def level
leak_bucket
@level
end
def level=(new_level)
Set bucket level (for testing purposes).
Signature
-
parameter
new_level
Numeric
New bucket level.
Implementation
def level=(new_level)
@level = new_level.to_f
@last_leak = Clock.now
end
def advance_time(seconds)
Simulate time advancement for testing purposes.
Signature
-
parameter
seconds
Numeric
Number of seconds to advance.
Implementation
def advance_time(seconds)
@last_leak -= seconds
leak_bucket
end
def statistics
Get current timing strategy statistics.
Signature
-
returns
Hash
Statistics hash with current state.
Implementation
def statistics
leak_bucket
{
name: "LeakyBucket",
current_level: @level,
maximum_capacity: @capacity,
leak_rate: @rate,
available_capacity: @capacity - @level,
utilization_percentage: (@level / @capacity) * 100
}
end
def leak_bucket(current_time = Clock.now)
Leak the bucket based on elapsed time.
Signature
-
parameter
current_time
Numeric
Current time.
Implementation
def leak_bucket(current_time = Clock.now)
return if @level <= 0 # Don't leak if already empty or negative
elapsed = current_time - @last_leak
leaked = elapsed * @rate
@level = [@level - leaked, 0.0].max
@last_leak = current_time
end