class Connection
Represents a bidirectional communication channel between supervisor and worker.
Handles message passing, call/response patterns, and connection lifecycle.
Nested
Definitions
def initialize(stream, id = 0, **state)
Initialize a new connection.
Signature
-
parameter
streamIO The underlying IO stream.
-
parameter
idInteger The starting call ID (default: 0).
-
parameter
stateHash Initial connection state.
Implementation
def initialize(stream, id = 0, **state)
@stream = stream
@id = id
@state = state
@reader = nil
@calls = {}
end
attr :calls
Signature
-
attribute
Hash(Integer, Call) Calls in progress.
attr_accessor :state
Signature
-
attribute
Hash(Symbol, Object) State associated with this connection, for example the process ID, etc.
def next_id
Generate the next unique call ID.
Signature
-
returns
Integer The next call identifier.
Implementation
def next_id
@id += 2
end
def write(**message)
Write a message to the connection stream.
Signature
-
parameter
messageHash The message to write.
Implementation
def write(**message)
@stream.write(JSON.dump(message) << "\n")
@stream.flush
end
def call(timeout: nil, **message)
Make a synchronous call and wait for a single response.
Signature
-
parameter
timeoutNumeric, nil Optional timeout for the call.
-
parameter
messageHash The call message.
-
returns
Hash The response.
Implementation
def call(timeout: nil, **message)
id = next_id
calls[id] = ::Thread::Queue.new
write(id: id, **message)
return calls[id].pop(timeout: timeout)
ensure
calls.delete(id)
end
def call(...)
Make a synchronous call and wait for a single response.
Implementation
def call(...)
Call.call(self, ...)
end
def read
Read a message from the connection stream.
Signature
-
returns
Hash, nil The parsed message or nil if stream is closed.
Implementation
def read
if line = @stream&.gets
JSON.parse(line, symbolize_names: true)
end
end
def each
Iterate over all messages from the connection.
Signature
-
yields
{|message| ...} Each message read from the stream.
Implementation
def each
while message = self.read
yield message
end
end
def run(target)
Run the connection, processing incoming messages.
Dispatches incoming calls to the target and routes responses to waiting calls.
Signature
-
parameter
targetDispatchable The target to dispatch calls to.
Implementation
def run(target)
self.each do |message|
if id = message.delete(:id)
if call = @calls[id]
# Response to a call:
call.push(**message)
elsif message.key?(:do)
# Incoming call:
Call.dispatch(self, target, id, message)
else
# Likely a response to a timed-out call, ignore it:
Console.debug(self, "Ignoring message:", message)
end
else
Console.error(self, "Unknown message:", message)
end
end
end
def run_in_background(target, parent: Task.current)
Run the connection in a background task.
Signature
-
parameter
targetDispatchable The target to dispatch calls to.
-
parameter
parentAsync::Task The parent task.
-
returns
Async::Task The background reader task.
Implementation
def run_in_background(target, parent: Task.current)
@reader ||= parent.async do
self.run(target)
end
end
def close
Close the connection and clean up resources.
Stops the background reader, closes the stream, and closes all pending calls.
Implementation
def close
if @reader
@reader.stop
@reader = nil
end
if stream = @stream
@stream = nil
stream.close
end
if @calls
@calls.each do |id, call|
call.close
end
@calls.clear
end
end