class Connection
Represents a bidirectional communication channel between supervisor and worker.
Handles message passing, call/response patterns, and connection lifecycle.
Nested
Definitions
def initialize(stream, id = 0, **state)
Initialize a new connection.
Signature
-
parameter
streamIO The underlying IO stream.
-
parameter
idInteger The starting call ID (default: 0).
-
parameter
stateHash Initial connection state.
Implementation
def initialize(stream, id = 0, **state)
@stream = stream
@id = id
@state = state
@reader = nil
@calls = {}
end
attr :calls
Signature
-
attribute
Hash(Integer, Call) Calls in progress.
attr_accessor :state
Signature
-
attribute
Hash(Symbol, Object) State associated with this connection, for example the process ID, etc.
def next_id
Generate the next unique call ID.
Signature
-
returns
Integer The next call identifier.
Implementation
def next_id
@id += 2
end
def write(**message)
Write a message to the connection stream.
Signature
-
parameter
messageHash The message to write.
Implementation
def write(**message)
raise IOError, "Connection is closed!" unless @stream
@stream&.write(JSON.dump(message) << "\n")
@stream&.flush # it is possible for @stream to become nil after the write call
end
def read
Read a message from the connection stream.
Signature
-
returns
Hash, nil The parsed message or nil if stream is closed.
Implementation
def read
if line = @stream&.gets
JSON.parse(line, symbolize_names: true)
end
end
def each
Iterate over all messages from the connection.
Signature
-
yields
{|message| ...} Each message read from the stream.
Implementation
def each
while message = self.read
yield message
end
end
def call(...)
Make a synchronous call and wait for a single response.
Implementation
def call(...)
Call.call(self, ...)
end
def run(target)
Run the connection, processing incoming messages.
Dispatches incoming calls to the target and routes responses to waiting calls.
Signature
-
parameter
targetDispatchable The target to dispatch calls to.
Implementation
def run(target)
# Process incoming messages from the connection:
self.each do |message|
# If the message has an ID, it is a response to a call:
if id = message.delete(:id)
# Find the call in the connection's calls hash:
if call = @calls[id]
# Enqueue the response for the call:
call.push(**message)
elsif message.key?(:do)
# Otherwise, if we couldn't find an existing call, it must be a new call:
Call.dispatch(self, target, id, message)
else
# Finally, if none of the above, it is likely a response to a timed-out call, so ignore it:
Console.debug(self, "Ignoring message:", message)
end
else
Console.error(self, "Unknown message:", message)
end
end
end
def run_in_background(target, parent: Task.current)
Run the connection in a background task.
Signature
-
parameter
targetDispatchable The target to dispatch calls to.
-
parameter
parentAsync::Task The parent task.
-
returns
Async::Task The background reader task.
Implementation
def run_in_background(target, parent: Task.current)
@reader ||= parent.async do
self.run(target)
end
end
def close
Close the connection and clean up resources.
Stops the background reader, closes the stream, and closes all pending calls.
Implementation
def close
if @reader
@reader.stop
@reader = nil
end
if stream = @stream
@stream = nil
stream.close
end
if @calls
@calls.each do |id, call|
call.close
end
@calls.clear
end
end