class LongTask
Manages long-running tasks by releasing connection tokens during I/O operations to prevent contention and maintain server responsiveness.
A long task is any long (1+ sec) operation that isn't CPU-bound (usually long I/O). Starting a long task lets the server accept one more (potentially CPU-bound) request. This allows us to handle many concurrent I/O bound requests, without adding contention (which impacts latency).
Definitions
STOP_PRIORITY = 1000
The priority to use when stopping a long task to re-acquire the connection token.
def self.current
Signature
-
returns
LongTask
The current long task.
Implementation
def self.current
Fiber.current.falcon_limiter_long_task
end
def self.current=(long_task)
Assign the current long task.
Implementation
def self.current=(long_task)
Fiber.current.falcon_limiter_long_task = long_task
end
def with
Execute the block with the current long task.
Implementation
def with
previous = self.class.current
self.class.current = self
yield
ensure
self.class.current = previous
end
def self.for(request, limiter, **options)
Create a long task for the given request. Extracts connection token from the request if available for proper token management.
Signature
-
parameter
limiter
Async::Limiter
The limiter instance for managing concurrent long tasks.
-
parameter
request
Object
The HTTP request object to extract connection information from.
-
parameter
options
Hash
Additional options passed to the constructor.
-
returns
LongTask
A new long task instance ready for use.
Implementation
def self.for(request, limiter, **options)
# Get connection token from request if possible:
connection_token = request&.connection&.stream&.io&.token rescue nil
return new(request, limiter, connection_token, **options)
end
def initialize(request, limiter, connection_token = nil, start_delay: 0.1)
Initialize a new long task with the specified configuration.
Signature
-
parameter
limiter
Async::Limiter
The limiter instance for controlling concurrency.
-
parameter
connection_token
Async::Limiter::Token, nil
Optional connection token to manage.
-
parameter
start_delay
Float
Delay in seconds before starting the long task (default: 0.1).
Implementation
def initialize(request, limiter, connection_token = nil, start_delay: 0.1)
@request = request
@limiter = limiter
@connection_token = connection_token
@start_delay = start_delay
@token = Async::Limiter::Token.new(@limiter)
@delayed_start_task = nil
end
def started?
Check if the long task has been started, but not necessarily acquired (e.g. if there was a delay).
Signature
-
returns
Boolean
If the long task has been started.
Implementation
def started?
@token.acquired? || @delayed_start_task
end
def acquired?
Check if the long task has been acquired.
Signature
-
returns
Boolean
If the long task token has been acquired.
Implementation
def acquired?
@token.acquired?
end
def start(delay: @start_delay)
Start the long task, optionally with a delay to avoid overhead for short operations
Implementation
def start(delay: @start_delay)
# If already started, nothing to do:
if started?
if block_given?
return yield self
else
return self
end
end
if delay == true
delay = @start_delay
elsif delay == false
delay = nil
end
# Otherwise, start the long task:
if delay&.positive?
# Wait specified delay before starting the long task:
@delayed_start_task = Async do
sleep(delay)
self.acquire
rescue Async::Stop
# Gracefully exit on stop.
ensure
@delayed_start_task = nil
end
else
# Start the long task immediately:
self.acquire
end
return self unless block_given?
begin
yield self
ensure
self.stop
end
end
def stop(force: false, **options)
Stop the long task and restore connection token
Implementation
def stop(force: false, **options)
if delayed_start_task = @delayed_start_task
@delayed_start_task = nil
delayed_start_task.stop
end
# Re-acquire the connection token with high priority than inbound requests:
options[:priority] ||= STOP_PRIORITY
# Release the long task token:
release(force, **options)
end
def acquire(**options)
This acquires the long task token and releases the connection token if it exists. This marks the beginning of a long task.
Signature
-
parameter
options
Hash
The options to pass to the long task token acquisition.
Implementation
def acquire(**options)
return if @token.acquired?
# Wait if we've reached our limit of ongoing long tasks.
if @token.acquire(**options)
# Release the socket accept token.
@connection_token&.release
# Mark connection as non-persistent since we released the token.
make_non_persistent!
end
end
def release(force = false, **options)
This releases the long task token and re-acquires the connection token if it exists. This marks the end of a long task.
Signature
-
parameter
force
Boolean
Whether to force the release of the long task token without re-acquiring the connection token.
-
parameter
options
Hash
The options to pass to the connection token re-acquisition.
Implementation
def release(force = false, **options)
return if @token.released?
@token.release
return if force
# Re-acquire the connection token to prevent overloading the connection limiter:
@connection_token&.acquire(**options)
end