Async::CableSourceAsyncCableExecutor

class Executor

Fiber-based replacement for ActionCable::Server::ThreadedExecutor.

Action Cable uses an #executor to dispatch internal async work (pub/sub callback invocations, heartbeat timers, periodic channel timers) and broadcasts a small interface: #post, #timer, #shutdown. Stock Rails backs this with a Concurrent::ThreadPoolExecutor; under a fiber-scheduler-aware server like Falcon every #post then bounces through an OS thread unnecessarily.

This executor instead spawns Async tasks. Tasks posted from inside a reactor run on the caller's reactor (no thread hop). Tasks posted or scheduled from outside a reactor run on a dedicated reactor thread owned by the executor.

Nested

Definitions

def initialize

Create a new executor. The dedicated reactor thread is started lazily on first use that needs it (timers, or #post from outside a reactor).

Implementation

def initialize
	@mutex = ::Thread::Mutex.new
	@inbox = nil
	@thread = nil
end

def post(task = nil, &block)

Run the given callable asynchronously. When called from inside a reactor this spawns a fire-and-forget child task on the current reactor; when called from outside a reactor this routes the task to the executor's dedicated reactor thread. The return value is the executor (matching ActionCable::Server::ThreadedExecutor#post).

Signature

parameter task #call, nil

Callable to run; if nil, the block is used.

Implementation

def post(task = nil, &block)
	block ||= task
	
	if current = ::Async::Task.current?
		current.async{block.call}
	else
		inbox.push(proc{block.call})
	end
	
	return self
end

def timer(interval, &block)

Schedule a recurring timer. When called from inside a reactor this spawns a child task on the current reactor; when called from outside a reactor this routes the timer to the executor's dedicated reactor thread.

Signature

parameter interval Numeric

Seconds between invocations.

returns Timer

A handle that responds to #shutdown.

Implementation

def timer(interval, &block)
	timer = Timer.new
	
	if current = ::Async::Task.current?
		timer.task = current.async do |inner|
			run_timer(inner, interval, block)
		end
		
		return timer
	end
	
	inbox = timer.inbox = self.inbox
	begin
		operation = proc do |task|
			timer.task = task.async do |inner|
				run_timer(inner, interval, block)
			end
		end
		
		inbox.push(operation)
	rescue ::ClosedQueueError
		# Executor is shutting down; match the best-effort
		# behaviour of posting work during shutdown.
	end
	
	return timer
end

def shutdown

Stop the dedicated reactor thread (if any). Tasks posted to the caller's reactor via #post are unaffected; their lifetime is owned by the calling reactor.

Implementation

def shutdown
	@mutex.synchronize do
		return unless @thread
		@inbox.close
		@thread.join
		@thread = nil
		@inbox = nil
	end
end