Falcon LimiterSourceFalconLimiterLongTask

class LongTask

Manages long-running tasks by releasing connection tokens during I/O operations to prevent contention and maintain server responsiveness.

A long task is any long (1+ sec) operation that isn't CPU-bound (usually long I/O). Starting a long task lets the server accept one more (potentially CPU-bound) request. This allows us to handle many concurrent I/O bound requests, without adding contention (which impacts latency).

Definitions

STOP_PRIORITY = 1000

The priority to use when stopping a long task to re-acquire the connection token.

def self.current

Signature

returns LongTask

The current long task.

Implementation

def self.current
	Fiber.current.falcon_limiter_long_task
end

def self.current=(long_task)

Assign the current long task.

Implementation

def self.current=(long_task)
	Fiber.current.falcon_limiter_long_task = long_task
end

def with

Execute the block with the current long task.

Implementation

def with
	previous = self.class.current
	self.class.current = self
	yield
ensure
	self.class.current = previous
end

def self.for(request, limiter, **options)

Create a long task for the given request. Extracts connection token from the request if available for proper token management.

Signature

parameter limiter Async::Limiter

The limiter instance for managing concurrent long tasks.

parameter request Object

The HTTP request object to extract connection information from.

parameter options Hash

Additional options passed to the constructor.

returns LongTask

A new long task instance ready for use.

Implementation

def self.for(request, limiter, **options)
	# Get connection token from request if possible:
	connection_token = request&.connection&.stream&.io&.token rescue nil
	
	return new(request, limiter, connection_token, **options)
end

def initialize(request, limiter, connection_token = nil, start_delay: 0.1)

Initialize a new long task with the specified configuration.

Signature

parameter limiter Async::Limiter

The limiter instance for controlling concurrency.

parameter connection_token Async::Limiter::Token, nil

Optional connection token to manage.

parameter start_delay Float

Delay in seconds before starting the long task (default: 0.1).

Implementation

def initialize(request, limiter, connection_token = nil, start_delay: 0.1)
	@request = request
	@limiter = limiter
	@connection_token = connection_token
	@start_delay = start_delay
	
	@token = Async::Limiter::Token.new(@limiter)
	@delayed_start_task = nil
end

def started?

Check if the long task has been started, but not necessarily acquired (e.g. if there was a delay).

Signature

returns Boolean

If the long task has been started.

Implementation

def started?
	@token.acquired? || @delayed_start_task
end

def acquired?

Check if the long task has been acquired.

Signature

returns Boolean

If the long task token has been acquired.

Implementation

def acquired?
	@token.acquired?
end

def start(delay: @start_delay)

Start the long task, optionally with a delay to avoid overhead for short operations

Implementation

def start(delay: @start_delay)
	# If already started, nothing to do:
	if started?
		if block_given?
			return yield self
		else
			return self
		end
	end
	
	if delay == true
		delay = @start_delay
	elsif delay == false
		delay = nil
	end
	
	# Otherwise, start the long task:
	if delay&.positive?
		# Wait specified delay before starting the long task:
		@delayed_start_task = Async do
			sleep(delay)
			self.acquire
		rescue Async::Stop
			# Gracefully exit on stop.
		ensure
			@delayed_start_task = nil
		end
	else
		# Start the long task immediately:
		self.acquire
	end
	
	return self unless block_given?
	
	begin
		yield self
	ensure
		self.stop
	end
end

def stop(force: false, **options)

Stop the long task and restore connection token

Implementation

def stop(force: false, **options)
	if delayed_start_task = @delayed_start_task
		@delayed_start_task = nil
		delayed_start_task.stop
	end
	
	# Re-acquire the connection token with high priority than inbound requests:
	options[:priority] ||= STOP_PRIORITY
	
	# Release the long task token:
	release(force, **options)
end

def acquire(**options)

This acquires the long task token and releases the connection token if it exists. This marks the beginning of a long task.

Signature

parameter options Hash

The options to pass to the long task token acquisition.

Implementation

def acquire(**options)
	return if @token.acquired?
	
	# Wait if we've reached our limit of ongoing long tasks.
	if @token.acquire(**options)
		# Release the socket accept token.
		@connection_token&.release
		
		# Mark connection as non-persistent since we released the token.
		make_non_persistent!
	end
end

def release(force = false, **options)

This releases the long task token and re-acquires the connection token if it exists. This marks the end of a long task.

Signature

parameter force Boolean

Whether to force the release of the long task token without re-acquiring the connection token.

parameter options Hash

The options to pass to the connection token re-acquisition.

Implementation

def release(force = false, **options)
	return if @token.released?
	
	@token.release
	
	return if force
	
	# Re-acquire the connection token to prevent overloading the connection limiter:
	@connection_token&.acquire(**options)
end