class Socket
Wraps a WebSocket connection to provide the interface expected by ActionCable connections. Buffers outbound messages in a queue and drains them asynchronously so that transmission never blocks the event loop.
Definitions
def initialize(env, websocket, server, coder: ActiveSupport::JSON)
Create a new socket wrapper.
Signature
-
parameter
envHash The Rack environment for the originating request.
-
parameter
websocketAsync::WebSocket::Connection The underlying WebSocket connection.
-
parameter
serverActionCable::Server::Base The ActionCable server instance.
-
parameter
coder#encode, #decode Coder used to serialise messages (defaults to
ActiveSupport::JSON).
Implementation
def initialize(env, websocket, server, coder: ActiveSupport::JSON)
@env = env
@websocket = websocket
@server = server
@coder = coder
@output = ::Thread::Queue.new
end
def logger
The ActionCable server logger.
Signature
-
returns
Logger
Implementation
def logger
@server.logger
end
def request
Build an ActionDispatch::Request from the Rack environment, merging Rails application config when available.
Signature
-
returns
ActionDispatch::Request
Implementation
def request
# Copied from `ActionCable::Server::Socket#request`:
@request ||= begin
if defined?(Rails.application) && Rails.application
environment = Rails.application.env_config.merge(@env)
end
ActionDispatch::Request.new(environment || @env)
end
end
def run(parent: Async::Task.current)
Start an async task that drains the outbound message queue and writes each message to the WebSocket. The task stops when the queue is closed.
Signature
-
parameter
parentAsync::Task The parent task to spawn under.
-
returns
Async::Task
Implementation
def run(parent: Async::Task.current)
parent.async do
while buffer = @output.pop
# Console.debug(self, "Sending cable data:", buffer, flush: @output.empty?)
@websocket.send_text(buffer)
@websocket.flush if @output.empty?
end
rescue => error
Console.error(self, "Error while sending cable data:", error)
ensure
unless @websocket.closed?
@websocket.close_write(error)
end
end
end
def transmit(data)
Encode and enqueue a message for asynchronous delivery to the client.
Signature
-
parameter
dataObject The data to transmit, which will be encoded by the coder.
Implementation
def transmit(data)
# Console.info(self, "Transmitting data:", data, task: Async::Task.current?)
@output.push(@coder.encode(data))
end
def raw_transmit(data)
Enqueue an already-encoded message for asynchronous delivery to the client, bypassing the coder. Useful for "fastlane" broadcasts where the payload is encoded once and shared across many connections.
Signature
-
parameter
dataString The pre-encoded message to transmit.
Implementation
def raw_transmit(data)
@output.push(data)
end
def close
Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.
Implementation
def close
# Console.info(self, "Closing socket.", task: Async::Task.current?)
@output.close
end
def perform_work(receiver, ...)
This can be called from the work pool, off the event loop.
Implementation
def perform_work(receiver, ...)
# Console.info(self, "Performing work:", receiver)
receiver.send(...)
end