Async LimiterSourceAsyncLimiterLimited

class Limited

Limited concurrency limiter that enforces a strict task limit.

This implements a counting semaphore that limits the number of concurrent operations. It coordinates with timing strategies to provide both concurrency and rate limiting.

The Limited limiter uses a mutex and condition variable for thread-safe coordination, with support for deadline-aware timeout handling.

Definitions

def initialize(limit = 1, timing: Timing::None, parent: nil)

Initialize a limited concurrency limiter.

Signature

parameter limit Integer

Maximum concurrent tasks allowed.

parameter timing #acquire, #wait, #maximum_cost

Strategy for timing constraints.

parameter parent Async::Task, nil

Parent task for creating child tasks.

raises ArgumentError

If limit is not positive.

Implementation

def initialize(limit = 1, timing: Timing::None, parent: nil)
	super(timing: timing, parent: parent)
	
	@limit = limit
	@count = 0
	
	@available = ConditionVariable.new
end

attr_reader :limit

Signature

attribute Integer

The maximum number of concurrent tasks.

attr_reader :count

Signature

attribute Integer

Current count of active tasks.

def limited?

Check if a new task can be acquired.

Signature

returns Boolean

True if under the limit.

Implementation

def limited?
	@mutex.synchronize{@count >= @limit}
end

def limit=(new_limit)

Update the concurrency limit.

Signature

parameter new_limit Integer

The new maximum number of concurrent tasks.

raises ArgumentError

If new_limit is not positive.

Implementation

def limit=(new_limit)
	@mutex.synchronize do
		old_limit = @limit
		@limit = new_limit
		
		# Wake up waiting tasks if limit increased:
		@available.broadcast if new_limit > old_limit
	end
end

def statistics

Get current limiter statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	@mutex.synchronize do
		{
			limit: @limit,
			count: @count,
			timing: @timing.statistics
		}
	end
end

def acquire_resource(deadline, **options)

Acquire resource with optional deadline.

Implementation

def acquire_resource(deadline, **options)
	# Fast path: immediate return for expired deadlines, but only if at capacity
	return nil if deadline&.expired? && @count >= @limit
	
	# Wait for capacity with deadline tracking
	while @count >= @limit
		remaining = deadline&.remaining
		return nil if remaining && remaining <= 0
		
		unless @available.wait(@mutex, remaining)
			return nil  # Timeout exceeded
		end
	end
	
	@count += 1
	
	return true
end

def release_resource(resource)

Release resource.

Implementation

def release_resource(resource)
	@mutex.synchronize do
		@count -= 1
		@available.signal
	end
end