Async LimiterSourceAsyncLimiterTimingLeakyBucket

class LeakyBucket

Leaky bucket timing strategy that smooths traffic flow.

This strategy maintains a "bucket" that gradually "leaks" capacity over time, enforcing a steady output rate regardless of input bursts.

Definitions

attr_reader :rate

Signature

attribute Numeric

Leak rate in units per second.

attr_reader :capacity

Signature

attribute Numeric

Maximum bucket capacity.

def initialize(rate, capacity, initial_level: 0)

Initialize a leaky bucket timing strategy.

Signature

parameter rate Numeric

Leak rate in units per second.

parameter capacity Numeric

Maximum bucket capacity.

parameter initial_level Numeric

Starting bucket level (0 = leaky bucket, capacity = token bucket).

Implementation

def initialize(rate, capacity, initial_level: 0)
	raise ArgumentError, "rate must be non-negative" unless rate >= 0
	raise ArgumentError, "capacity must be positive" unless capacity.positive?
	
	@rate = rate
	@capacity = capacity
	@level = initial_level.to_f
	@last_leak = Clock.now
end

def maximum_cost

Maximum cost this timing strategy can support.

Signature

returns Numeric

The maximum cost (equal to capacity).

Implementation

def maximum_cost
	@capacity
end

def can_acquire?(cost = 1, current_time = Clock.now)

Check if a task can be acquired with the given cost.

Signature

parameter cost Numeric

The cost/weight of the operation.

parameter current_time Numeric

Current time for leak calculations.

returns Boolean

True if bucket has capacity for this cost.

Implementation

def can_acquire?(cost = 1, current_time = Clock.now)
	leak_bucket(current_time)
	@level + cost <= @capacity
end

def acquire(cost = 1)

Record that a task was acquired (adds cost to bucket level).

Signature

parameter cost Numeric

The cost/weight of the operation.

Implementation

def acquire(cost = 1)
	leak_bucket
	@level += cost
end

def wait(mutex, deadline = nil, cost = 1)

Wait for bucket to have capacity.

Signature

parameter mutex Mutex

Mutex to release during sleep.

parameter deadline Deadline, nil

Deadline for timeout (nil = wait forever).

parameter cost Numeric

Cost of the operation (default: 1).

returns Boolean

True if capacity is available, false if timeout exceeded.

Implementation

def wait(mutex, deadline = nil, cost = 1)
	# Loop until we can acquire or deadline expires:
	until can_acquire?(cost, Clock.now)
		# Check deadline before each wait:
		return false if deadline&.expired?
		
		# Calculate how long to wait for bucket to leak enough for this cost:
		needed_capacity = (@level + cost) - @capacity
		wait_time = needed_capacity / @rate.to_f
		
		# Should be able to acquire now:
		return true if wait_time <= 0
		
		# Check if wait would exceed deadline:
		remaining = deadline&.remaining
		if remaining && wait_time > remaining
			# Would exceed deadline:
			return false
		end
		
		# Wait for the required time (or remaining time if deadline specified):
		actual_wait = remaining ? [wait_time, remaining].min : wait_time
		
		# Release mutex during sleep:
		mutex.sleep(actual_wait)
	end
	
	return true
end

def level

Get current bucket level (for debugging/monitoring).

Signature

returns Numeric

Current bucket level.

Implementation

def level
	leak_bucket
	@level
end

def level=(new_level)

Set bucket level (for testing purposes).

Signature

parameter new_level Numeric

New bucket level.

Implementation

def level=(new_level)
	@level = new_level.to_f
	@last_leak = Clock.now
end

def advance_time(seconds)

Simulate time advancement for testing purposes.

Signature

parameter seconds Numeric

Number of seconds to advance.

Implementation

def advance_time(seconds)
	@last_leak -= seconds
	leak_bucket
end

def statistics

Get current timing strategy statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	leak_bucket
	
	{
		name: "LeakyBucket",
		current_level: @level,
		maximum_capacity: @capacity,
		leak_rate: @rate,
		available_capacity: @capacity - @level,
		utilization_percentage: (@level / @capacity) * 100
	}
end

def leak_bucket(current_time = Clock.now)

Leak the bucket based on elapsed time.

Signature

parameter current_time Numeric

Current time.

Implementation

def leak_bucket(current_time = Clock.now)
	return if @level <= 0  # Don't leak if already empty or negative
	
	elapsed = current_time - @last_leak
	leaked = elapsed * @rate
	
	@level = [@level - leaked, 0.0].max
	@last_leak = current_time
end