class Executor
Fiber-based replacement for ActionCable::Server::ThreadedExecutor.
Action Cable uses an #executor to dispatch internal async work
(pub/sub callback invocations, heartbeat timers, periodic channel
timers) and broadcasts a small interface: #post, #timer,
#shutdown. Stock Rails backs this with a
Concurrent::ThreadPoolExecutor; under a fiber-scheduler-aware
server like Falcon every #post then bounces through an OS thread
unnecessarily.
This executor instead spawns Async tasks. Tasks posted from inside a reactor run on the caller's reactor (no thread hop). Tasks posted or scheduled from outside a reactor run on a dedicated reactor thread owned by the executor.
Nested
Definitions
def initialize
Create a new executor. The dedicated reactor thread is started
lazily on first use that needs it (timers, or #post from
outside a reactor).
Implementation
def initialize
@mutex = ::Thread::Mutex.new
@inbox = nil
@thread = nil
end
def post(task = nil, &block)
Run the given callable asynchronously. When called from inside
a reactor this spawns a fire-and-forget child task on the
current reactor; when called from outside a reactor this routes
the task to the executor's dedicated reactor thread. The return
value is the executor (matching
ActionCable::Server::ThreadedExecutor#post).
Signature
-
parameter
task#call, nil Callable to run; if nil, the block is used.
Implementation
def post(task = nil, &block)
block ||= task
if current = ::Async::Task.current?
current.async{block.call}
else
inbox.push(proc{block.call})
end
return self
end
def timer(interval, &block)
Schedule a recurring timer. When called from inside a reactor this spawns a child task on the current reactor; when called from outside a reactor this routes the timer to the executor's dedicated reactor thread.
Signature
-
parameter
intervalNumeric Seconds between invocations.
-
returns
Timer A handle that responds to
#shutdown.
Implementation
def timer(interval, &block)
timer = Timer.new
if current = ::Async::Task.current?
timer.task = current.async do |inner|
run_timer(inner, interval, block)
end
return timer
end
inbox = timer.inbox = self.inbox
begin
operation = proc do |task|
timer.task = task.async do |inner|
run_timer(inner, interval, block)
end
end
inbox.push(operation)
rescue ::ClosedQueueError
# Executor is shutting down; match the best-effort
# behaviour of posting work during shutdown.
end
return timer
end
def shutdown
Stop the dedicated reactor thread (if any). Tasks posted to
the caller's reactor via #post are unaffected; their
lifetime is owned by the calling reactor.
Implementation
def shutdown
@mutex.synchronize do
return unless @thread
@inbox.close
@thread.join
@thread = nil
@inbox = nil
end
end