class Connection
Represents a connection between client and server for message passing.
Definitions
def self.client(peer, **options)
Create a client-side connection.
Signature
-
parameter
peerIO The peer connection.
-
parameter
optionsHash Additional options for the connection.
-
returns
Connection A new client connection.
Implementation
def self.client(peer, **options)
self.new(peer, 1, **options)
end
def self.server(peer, **options)
Create a server-side connection.
Signature
-
parameter
peerIO The peer connection.
-
parameter
optionsHash Additional options for the connection.
-
returns
Connection A new server connection.
Implementation
def self.server(peer, **options)
self.new(peer, 2, **options)
end
def initialize(peer, id, wrapper: Wrapper, timeout: nil)
Initialize a new connection.
Signature
-
parameter
peerIO The peer connection.
-
parameter
idInteger The initial transaction ID.
-
parameter
wrapperClass The wrapper class for serialization.
-
parameter
timeoutFloat The timeout for transactions.
Implementation
def initialize(peer, id, wrapper: Wrapper, timeout: nil)
@peer = peer
@id = id
@wrapper = wrapper.new(self)
@unpacker = @wrapper.unpacker(peer)
@packer = @wrapper.packer(peer)
@timeout = timeout
@transactions = {}
@objects = {}
@proxies = ::ObjectSpace::WeakMap.new
@finalized = ::Thread::Queue.new
end
attr_accessor :timeout
Signature
-
attribute
Float The timeout for transactions.
def flush
Flush the packer buffer.
Implementation
def flush
@packer.flush
end
def write(message)
Write a message to the connection.
Signature
-
parameter
messageObject The message to write.
Implementation
def write(message)
# $stderr.puts "Writing: #{message.inspect}"
@packer.write(message)
@packer.flush
end
def close
Close the connection and clean up resources.
Implementation
def close
@transactions.each do |id, transaction|
transaction.close
end
@peer.close
end
def inspect
Return a string representation of the connection.
Signature
-
returns
String A string describing the connection.
Implementation
def inspect
"#<#{self.class} #{@objects.size} objects>"
end
attr :objects
Signature
-
attribute
Hash The bound objects.
attr :proxies
Signature
-
attribute
ObjectSpace::WeakMap The proxy cache.
attr :unpacker
Signature
-
attribute
MessagePack::Unpacker The message unpacker.
attr :packer
Signature
-
attribute
MessagePack::Packer The message packer.
def next_id
Get the next transaction ID.
Signature
-
returns
Integer The next transaction ID.
Implementation
def next_id
id = @id
@id += 2
return id
end
attr :transactions
Signature
-
attribute
Hash Active transactions.
def []=(name, object)
Explicitly bind an object to a name, such that it could be accessed remotely.
This is the same as Async::Bus::Protocol::Connection#bind but due to the semantics of the []= operator, it does not return a proxy instance.
Explicitly bound objects are not garbage collected until the connection is closed.
Signature
-
parameter
nameString The name to bind the object to.
-
parameter
objectObject The object to bind to the given name.
Implementation
def []=(name, object)
@objects[name] = Explicit.new(object)
end
def [](name)
Generate a proxy for a remotely bound object.
This always returns a proxy, even if the object is bound locally.
The object bus is not shared between client and server, so [] always
returns a proxy to the remote instance.
Signature
-
parameter
nameString The name of the bound object.
-
returns
Proxy A proxy instance for the bound object.
Implementation
def [](name)
return proxy_for(name)
end
def bind(name, object)
Explicitly bind an object to a name, such that it could be accessed remotely.
This method is identical to Async::Bus::Protocol::Connection#[]= but also returns a class Async::Bus::Protocol::Proxy instance for the bound object which can be passed by reference.
Explicitly bound objects are not garbage collected until the connection is closed.
Example: Binding an object to a name and accessing it remotely.
array_proxy = connection.bind(:items, [1, 2, 3])
connection[:remote].register(array_proxy)
Signature
-
parameter
nameString The name to bind the object to.
-
parameter
objectObject The object to bind to the given name.
-
returns
Proxy A proxy instance for the bound object.
Implementation
def bind(name, object)
# Bind the object into the local object store (explicitly bound, not temporary):
@objects[name] = Explicit.new(object)
# Always return a proxy for passing by reference, even for locally bound objects:
return proxy_for(name)
end
def proxy(object)
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
Implicitly bound objects are garbage collected when the remote end no longer references them.
This method is simliar to Async::Bus::Protocol::Connection#bind but is designed to be used to generate temporary proxies for objects that are not explicitly bound.
Signature
-
parameter
objectObject The object to bind to a temporary name.
-
returns
Proxy A proxy instance for the bound object.
Implementation
def proxy(object)
name = object.__id__
# Bind the object into the local object store (temporary):
@objects[name] ||= Implicit.new(object)
# Always return a proxy for passing by reference:
return proxy_for(name)
end
def proxy_name(object)
Implicitly bind an object with a temporary name, such that it could be accessed remotely.
Implicitly bound objects are garbage collected when the remote end no longer references them.
This method is similar to Async::Bus::Protocol::Connection#proxy but is designed to be used to generate temporary names for objects that are not explicitly bound during serialization.
Signature
-
parameter
objectObject The object to bind to a temporary name.
-
returns
String The name of the bound object.
Implementation
def proxy_name(object)
name = object.__id__
# Bind the object into the local object store (temporary):
@objects[name] ||= Implicit.new(object)
# Return the name:
return name
end
def proxy_object(name)
Get an object or proxy for a bound object, handling reverse lookup.
If the object is bound locally and the proxy is for this connection, returns the actual object. If the object is bound remotely, or the proxy is from a different connection, returns a proxy. This is used when deserializing proxies to handle round-trip scenarios and avoid name collisions.
Signature
-
parameter
nameString The name of the bound object.
-
parameter
localBoolean Whether the proxy is for this connection (from serialization). Defaults to true.
-
returns
Object | Proxy The object if bound locally and proxy is for this connection, or a proxy otherwise.
Implementation
def proxy_object(name)
# If the proxy is for this connection and the object is bound locally, return the actual object:
if entry = @objects[name]
# This handles round-trip scenarios correctly.
return entry.object
end
# Otherwise, create a proxy for the remote object:
return proxy_for(name)
end
def transaction!(id = self.next_id)
Create a new transaction.
Signature
-
parameter
idInteger The transaction ID.
-
returns
Transaction A new transaction.
Implementation
def transaction!(id = self.next_id)
transaction = Transaction.new(self, id, timeout: @timeout)
@transactions[id] = transaction
return transaction
end
def invoke(name, arguments, options = {}, &block)
Invoke a remote procedure.
Signature
-
parameter
nameSymbol The name of the remote object.
-
parameter
argumentsArray The arguments to pass.
-
parameter
optionsHash The keyword arguments to pass.
-
yields
{|*args| ...} Optional block for yielding operations.
-
returns
Object The result of the invocation.
Implementation
def invoke(name, arguments, options = {}, &block)
transaction = self.transaction!
transaction.invoke(name, arguments, options, &block)
ensure
transaction&.close
end
def send_release(name)
Send a release message for a named object.
Signature
-
parameter
nameSymbol The name of the object to release.
Implementation
def send_release(name)
self.write(Release.new(name))
end
def run(parent: Task.current)
Run the connection message loop.
Signature
-
parameter
parentAsync::Task The parent task to run under.
Implementation
def run(parent: Task.current)
finalizer_task = parent.async do
while name = @finalized.pop
self.send_release(name)
end
end
@unpacker.each do |message|
case message
when Invoke
# If the object is not found, send an error response and skip the transaction:
if object = @objects[message.name]&.object
transaction = self.transaction!(message.id)
parent.async(annotation: "Invoke #{message.name}") do
# $stderr.puts "-> Accepting: #{message.name} #{message.arguments.inspect} #{message.options.inspect}"
transaction.accept(object, message.arguments, message.options, message.block_given)
ensure
# $stderr.puts "<- Accepted: #{message.name}"
# This will also delete the transaction from @transactions:
transaction.close
end
else
self.write(Error.new(message.id, NameError.new("Object not found: #{message.name}")))
end
when Response
if transaction = @transactions[message.id]
transaction.push(message)
else
# Stale message - transaction already closed (e.g. timeout) or never existed (ignore silently).
end
when Release
name = message.name
if @objects[name]&.temporary?
# Only delete temporary objects, not explicitly bound ones:
@objects.delete(name)
end
else
Console.error(self, "Unexpected message:", message)
end
end
ensure
finalizer_task&.stop
@transactions.each do |id, transaction|
transaction.close
end
@transactions.clear
@proxies = ::ObjectSpace::WeakMap.new
end