Async::CableSourceAsyncCableSocket

class Socket

Wraps a WebSocket connection to provide the interface expected by ActionCable connections. Buffers outbound messages in a queue and drains them asynchronously so that transmission never blocks the event loop.

Definitions

def initialize(env, websocket, server, coder: ActiveSupport::JSON)

Create a new socket wrapper.

Signature

parameter env Hash

The Rack environment for the originating request.

parameter websocket Async::WebSocket::Connection

The underlying WebSocket connection.

parameter server ActionCable::Server::Base

The ActionCable server instance.

parameter coder #encode, #decode

Coder used to serialise messages (defaults to ActiveSupport::JSON).

Implementation

def initialize(env, websocket, server, coder: ActiveSupport::JSON)
	@env = env
	@websocket = websocket
	@server = server
	@coder = coder
	
	@output = ::Thread::Queue.new
end

def logger

The ActionCable server logger.

Signature

returns Logger

Implementation

def logger
	@server.logger
end

def request

Build an ActionDispatch::Request from the Rack environment, merging Rails application config when available.

Signature

returns ActionDispatch::Request

Implementation

def request
	# Copied from `ActionCable::Server::Socket#request`:
	@request ||= begin
		if defined?(Rails.application) && Rails.application
			environment = Rails.application.env_config.merge(@env)
		end
		
		ActionDispatch::Request.new(environment || @env)
	end
end

def run(parent: Async::Task.current)

Start an async task that drains the outbound message queue and writes each message to the WebSocket. The task stops when the queue is closed.

Signature

parameter parent Async::Task

The parent task to spawn under.

returns Async::Task

Implementation

def run(parent: Async::Task.current)
	parent.async do
		while buffer = @output.pop
			# Console.debug(self, "Sending cable data:", buffer, flush: @output.empty?)
			@websocket.send_text(buffer)
			@websocket.flush if @output.empty?
		end
	rescue => error
		Console.error(self, "Error while sending cable data:", error)
	ensure
		unless @websocket.closed?
			@websocket.close_write(error)
		end
	end
end

def transmit(data)

Encode and enqueue a message for asynchronous delivery to the client.

Signature

parameter data Object

The data to transmit, which will be encoded by the coder.

Implementation

def transmit(data)
	# Console.info(self, "Transmitting data:", data, task: Async::Task.current?)
	@output.push(@coder.encode(data))
end

def raw_transmit(data)

Enqueue an already-encoded message for asynchronous delivery to the client, bypassing the coder. Useful for "fastlane" broadcasts where the payload is encoded once and shared across many connections.

Signature

parameter data String

The pre-encoded message to transmit.

Implementation

def raw_transmit(data)
	@output.push(data)
end

def close

Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.

Implementation

def close
	# Console.info(self, "Closing socket.", task: Async::Task.current?)
	@output.close
end

def perform_work(receiver, ...)

This can be called from the work pool, off the event loop.

Implementation

def perform_work(receiver, ...)
	# Console.info(self, "Performing work:", receiver)
	receiver.send(...)
end