Async LimiterSourceAsyncLimiterGeneric

class Generic

Generic limiter class with unlimited concurrency by default.

This provides the foundation for rate limiting and concurrency control. Subclasses can override methods to implement specific limiting behaviors.

The Generic limiter coordinates timing strategies with concurrency control, providing thread-safe acquisition with deadline tracking and cost-based consumption.

Definitions

def initialize(timing: Timing::None, parent: nil, tags: nil)

Initialize a new generic limiter.

Signature

parameter timing #acquire, #wait, #maximum_cost

Strategy for timing constraints.

parameter parent Async::Task, nil

Parent task for creating child tasks.

Implementation

def initialize(timing: Timing::None, parent: nil, tags: nil)
	@timing = timing
	@parent = parent
	@tags = tags
	
	@mutex = Mutex.new
end

attr :tags

Signature

attribute Array(String)

Tags associated with this limiter for identification or categorization.

def limited?

Signature

returns Boolean

Whether this limiter is currently limiting concurrency.

Implementation

def limited?
	false
end

def async(parent: (@parent || Task.current), **options)

  • asynchronous

Execute a task asynchronously with unlimited concurrency.

Signature

parameter parent Async::Task

Parent task for the new task.

parameter options Hash

Additional options passed to Async::Task#async.

yields {|task| ...}

The block to execute within the limiter constraints.

parameter task Async::Task

The async task context.

returns Async::Task

The created task.

asynchronous

Implementation

def async(parent: (@parent || Task.current), **options)
	acquire
	parent.async(**options) do |task|
		yield task
	ensure
		release
	end
end

def sync

  • asynchronous

Execute a task synchronously with unlimited concurrency.

Signature

yields {|task| ...}

The block to execute within the limiter constraints.

parameter task Async::Task

The current task context.

asynchronous

Implementation

def sync
	acquire do
		yield(Task.current)
	end
end

def acquire(timeout: nil, cost: 1, **options)

  • asynchronous

Manually acquire a resource with timing and concurrency coordination.

This method provides the core acquisition logic with support for:

  • Flexible timeout handling (blocking, non-blocking, timed)
  • Cost-based consumption for timing strategies
  • Deadline tracking to prevent timeout violations
  • Automatic resource cleanup with block usage

Signature

parameter timeout Numeric, nil

Timeout in seconds (nil = wait forever, 0 = non-blocking).

parameter cost Numeric

The cost/weight of this operation for timing strategies (default: 1).

parameter options Hash

Additional options passed to concurrency acquisition.

yields {|resource| ...}

Optional block executed with automatic resource release.

parameter resource Object

The acquired resource.

returns Object, nil

The acquired resource, or nil if acquisition failed/timed out. When used with a block, returns the result of the block execution.

raises ArgumentError

If cost exceeds the timing strategy's maximum supported cost.

asynchronous

Implementation

def acquire(timeout: nil, cost: 1, **options)
	# Validate cost against timing strategy capacity:
	maximum_cost = @timing.maximum_cost
	if cost > maximum_cost
		raise ArgumentError, "Cost #{cost} exceeds maximum supported cost #{maximum_cost} for timing strategy!"
	end
	
	resource = nil
	deadline = Deadline.start(timeout)
	
	# Atomically handle timing constraints and concurrency logic:
	acquire_synchronized(timeout, cost, **options) do
		# Wait for timing constraints to be satisfied (mutex released during sleep)
		return nil unless @timing.wait(@mutex, deadline, cost)
		
		# Execute the concurrency-specific acquisition logic
		resource = acquire_resource(deadline, **options)
		
		# Record timing acquisition if successful
		if resource
			@timing.acquire(cost)
		else
			# `acquire_concurrency` should return nil if deadline reached:
			return nil
		end
		
		resource
	end
	
	return resource unless block_given?
	
	begin
		yield(resource)
	ensure
		release(resource)
	end
end

def release(resource = true)

Release a previously acquired resource.

Implementation

def release(resource = true)
	release_resource(resource)
end

def statistics

Get current limiter statistics.

Signature

returns Hash

Statistics hash with current state.

Implementation

def statistics
	@mutex.synchronize do
		{
			timing: @timing.statistics
		}
	end
end

def acquire_resource(deadline, **options)

Default resource acquisition for unlimited semaphore. Subclasses should override this method.

Implementation

def acquire_resource(deadline, **options)
	# Default unlimited behavior - always succeeds
	true
end

def release_resource(resource)

Default resource release for unlimited semaphore. Subclasses should override this method.

Implementation

def release_resource(resource)
	# Default implementation - subclasses should override.
end