class Call
Represents a remote procedure call over a connection.
Manages the call lifecycle, response queueing, and completion signaling.
Definitions
def initialize(connection, id, message)
Initialize a new call.
Signature
-
parameter
connectionConnection The connection this call belongs to.
-
parameter
idInteger The unique call identifier.
-
parameter
messageHash The call message/parameters.
Implementation
def initialize(connection, id, message)
@connection = connection
@id = id
@message = message
@queue = ::Thread::Queue.new
end
def as_json(...)
Convert the call to a JSON-compatible hash.
Signature
-
returns
Hash The message hash.
Implementation
def as_json(...)
@message
end
def to_json(...)
Convert the call to a JSON string.
Signature
-
returns
String The JSON representation.
Implementation
def to_json(...)
as_json.to_json(...)
end
attr :connection
Signature
-
attribute
Connection The connection that initiated the call.
attr :message
Signature
-
attribute
Hash The message that initiated the call.
def [](key)
Access a parameter from the call message.
Signature
-
parameter
keySymbol The parameter name.
-
returns
Object The parameter value.
Implementation
def [] key
@message[key]
end
def push(**response)
Push a response into the call's queue.
Signature
-
parameter
responseHash The response data to push.
Implementation
def push(**response)
@queue.push(response)
end
def pop(...)
Pop a response from the call's queue.
Signature
-
returns
Hash, nil The next response or nil if queue is closed.
Implementation
def pop(...)
@queue.pop(...)
end
def close
The call was never completed and the connection itself was closed.
Implementation
def close
@queue.close
end
def each(&block)
Iterate over all responses from the call.
Signature
-
yields
{|response| ...} Each response from the queue.
Implementation
def each(&block)
while response = self.pop
yield response
end
end
def finish(**response)
Finish the call with a final response.
Closes the response queue after pushing the final response.
Signature
-
parameter
responseHash The final response data.
Implementation
def finish(**response)
# If the remote end has already closed the connection, we don't need to send a finished message:
unless @queue.closed?
self.push(id: @id, finished: true, **response)
@queue.close
end
end
def fail(**response)
Finish the call with a failure response.
Signature
-
parameter
responseHash The error response data.
Implementation
def fail(**response)
self.finish(failed: true, **response)
end
def closed?
Check if the call's queue is closed.
Signature
-
returns
Boolean True if the queue is closed.
Implementation
def closed?
@queue.closed?
end
def forward(target_connection, operation)
Forward this call to another connection, proxying all responses back.
This provides true streaming forwarding - intermediate responses flow through in real-time rather than being buffered. The forwarding runs asynchronously to avoid blocking the dispatcher.
Signature
-
parameter
target_connectionConnection The connection to forward the call to.
-
parameter
operationHash The operation request to forward (must include :do key).
Implementation
def forward(target_connection, operation)
# Forward the operation in an async task to avoid blocking
Async do
# Make the call to the target connection and stream responses back:
Call.call(target_connection, **operation) do |response|
# Push each response through our queue:
self.push(**response)
end
ensure
# Close our queue to signal completion:
@queue.close
end
end
def self.dispatch(connection, target, id, message)
Dispatch a call to a target handler.
Creates a call, dispatches it to the target, and streams responses back through the connection.
Signature
-
parameter
connectionConnection The connection to dispatch on.
-
parameter
targetDispatchable The target handler.
-
parameter
idInteger The call identifier.
-
parameter
messageHash The call message.
Implementation
def self.dispatch(connection, target, id, message)
Async do
call = self.new(connection, id, message)
connection.calls[id] = call
target.dispatch(call)
while response = call.pop
connection.write(id: id, **response)
end
ensure
# If the queue is closed, we don't need to send a finished message.
unless call.closed?
connection.write(id: id, finished: true)
end
connection.calls.delete(id)
end
end
def self.call(connection, **message, &block)
Make a call on a connection and wait for responses.
If a block is provided, yields each response. Otherwise, buffers intermediate responses and returns the final response.
Signature
-
parameter
connectionConnection The connection to call on.
-
parameter
messageHash The call message/parameters.
-
yields
{|response| ...} Each intermediate response if block given.
-
returns
Hash, Array The final response or array of intermediate responses.
Implementation
def self.call(connection, **message, &block)
id = connection.next_id
call = self.new(connection, id, message)
connection.calls[id] = call
begin
connection.write(id: id, **message)
if block_given?
call.each(&block)
else
intermediate = nil
while response = call.pop
if response.delete(:finished)
if intermediate
if response.any?
intermediate << response
end
return intermediate
else
return response
end
else
# Buffer intermediate responses:
intermediate ||= []
intermediate << response
end
end
end
ensure
connection.calls.delete(id)
end
end