Async LimiterSourceAsyncLimiterTimingSlidingWindow

class SlidingWindow

Sliding window timing strategy with rolling time periods.

This strategy enforces rate limiting with a rolling window that slides continuously, providing smoother rate limiting than fixed windows.

Definitions

attr_reader :limit

Signature

attribute Numeric

Maximum tasks allowed per window.

attr_reader :duration

Signature

attribute Numeric

Duration of the timing window in seconds.

def initialize(duration, burst, limit)

Initialize a window timing strategy.

Signature

parameter duration Numeric

Duration of the window in seconds.

parameter burst #can_acquire?

Controls bursting vs smooth behavior.

parameter limit Integer

Maximum tasks per window.

Implementation

def initialize(duration, burst, limit)
	raise ArgumentError, "duration must be positive" unless duration.positive?
	
	@duration = duration
	@burst = burst
	@limit = limit
	
	@start_time = nil
	@count = 0
	@frame_start_time = nil
end

def maximum_cost

Maximum cost this timing strategy can support.

Signature

returns Numeric

The maximum cost (equal to limit).

Implementation

def maximum_cost
	@limit
end

def can_acquire?(cost = 1, current_time = Clock.now)

Check if a task can be acquired based on window constraints.

Signature

parameter cost Numeric

The cost/weight of the operation.

parameter current_time Numeric

Current time for window calculations.

returns Boolean

True if window and frame constraints allow acquisition.

Implementation

def can_acquire?(cost = 1, current_time = Clock.now)
	# Update window if needed
	if window_changed?(current_time, @start_time)
		@start_time = window_start_time(current_time)
		@count = 0
	end
	
	frame_changed = frame_changed?(current_time)
	
	# Check both window and frame constraints with cost
	@burst.can_acquire?(@count + cost - 1, @limit, frame_changed)
end

def acquire(cost = 1)

Record that a task was acquired.

Signature

parameter cost Numeric

The cost/weight of the operation.

Implementation

def acquire(cost = 1)
	@count += cost
	@frame_start_time = Clock.now
end

def wait(mutex, deadline = nil, cost = 1)

Wait for timing constraints to be satisfied. Sleeps until the next window or frame becomes available, or returns immediately if ready.

Signature

parameter mutex Mutex

Mutex to release during sleep.

parameter deadline Deadline, nil

Deadline for timeout (nil = wait forever).

parameter cost Numeric

Cost of the operation (default: 1).

returns Boolean

True if constraints are satisfied, false if timeout exceeded.

Implementation

def wait(mutex, deadline = nil, cost = 1)
	# Only wait if we can't acquire right now:
	until can_acquire?(cost, Clock.now)
		# Handle non-blocking case
		if deadline&.expired? || (deadline && deadline.remaining == 0)
			return false
		end
		
		next_time = @burst.next_acquire_time(
			@start_time,
			@duration,
			@frame_start_time,
			@duration / @limit.to_f
		)
		
		current_time = Clock.now
		wait_time = next_time - current_time
		
		return true if wait_time <= 0
		
		# Check if wait would exceed deadline
		remaining = deadline&.remaining
		if remaining && wait_time > remaining
			return false  # Would exceed deadline
		end
		
		# Wait for the required time (or remaining time if deadline specified)
		actual_wait = remaining ? [wait_time, remaining].min : wait_time
		
		mutex.sleep(actual_wait)  # Release mutex during sleep
	end
	
	return true
end

def window_start_time(current_time)

Calculate the start time of the current window for the given time. Default implementation provides sliding window behavior.

Signature

parameter current_time Numeric

The current time.

returns Numeric

The window start time (current time for sliding).

Implementation

def window_start_time(current_time)
	current_time  # Sliding window: always starts now
end

def window_changed?(current_time, last_window_start)

Check if the window has changed since the last window start.

Signature

parameter current_time Numeric

The current time.

parameter last_window_start Numeric

The previous window start time.

returns Boolean

True if a new window period has begun.

Implementation

def window_changed?(current_time, last_window_start)
	return true if last_window_start.nil?
	last_window_start + @duration <= current_time
end

def statistics

Get current timing strategy statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	{
		name: "SlidingWindow",
		window_duration: @duration,
		window_limit: @limit,
		current_window_count: @count,
		window_utilization_percentage: (@count.to_f / @limit) * 100,
		burst: @burst.statistics,
	}
end