Async::BusSourceAsyncBusProtocolConnection

class Connection

Represents a connection between client and server for message passing.

Definitions

def self.client(peer, **options)

Create a client-side connection.

Signature

parameter peer IO

The peer connection.

parameter options Hash

Additional options for the connection.

returns Connection

A new client connection.

Implementation

def self.client(peer, **options)
	self.new(peer, 1, **options)
end

def self.server(peer, **options)

Create a server-side connection.

Signature

parameter peer IO

The peer connection.

parameter options Hash

Additional options for the connection.

returns Connection

A new server connection.

Implementation

def self.server(peer, **options)
	self.new(peer, 2, **options)
end

def initialize(peer, id, wrapper: Wrapper, timeout: nil)

Initialize a new connection.

Signature

parameter peer IO

The peer connection.

parameter id Integer

The initial transaction ID.

parameter wrapper Class

The wrapper class for serialization.

parameter timeout Float

The timeout for transactions.

Implementation

def initialize(peer, id, wrapper: Wrapper, timeout: nil)
	@peer = peer
	@id = id
	
	@wrapper = wrapper.new(self)
	@unpacker = @wrapper.unpacker(peer)
	@packer = @wrapper.packer(peer)
	
	@timeout = timeout
	
	@transactions = {}
	
	@objects = {}
	@proxies = ::ObjectSpace::WeakMap.new
	@finalized = ::Thread::Queue.new
end

attr_accessor :timeout

Signature

attribute Float

The timeout for transactions.

def flush

Flush the packer buffer.

Implementation

def flush
	@packer.flush
end

def write(message)

Write a message to the connection.

Signature

parameter message Object

The message to write.

Implementation

def write(message)
	# $stderr.puts "Writing: #{message.inspect}"
	@packer.write(message)
	@packer.flush
end

def close

Close the connection and clean up resources.

Implementation

def close
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@peer.close
end

def inspect

Return a string representation of the connection.

Signature

returns String

A string describing the connection.

Implementation

def inspect
	"#<#{self.class} #{@objects.size} objects>"
end

attr :objects

Signature

attribute Hash

The bound objects.

attr :proxies

Signature

attribute ObjectSpace::WeakMap

The proxy cache.

attr :unpacker

Signature

attribute MessagePack::Unpacker

The message unpacker.

attr :packer

Signature

attribute MessagePack::Packer

The message packer.

def next_id

Get the next transaction ID.

Signature

returns Integer

The next transaction ID.

Implementation

def next_id
	id = @id
	@id += 2
	
	return id
end

attr :transactions

Signature

attribute Hash

Active transactions.

def []=(name, object)

Explicitly bind an object to a name, such that it could be accessed remotely.

This is the same as Async::Bus::Protocol::Connection#bind but due to the semantics of the []= operator, it does not return a proxy instance.

Explicitly bound objects are not garbage collected until the connection is closed.

Signature

parameter name String

The name to bind the object to.

parameter object Object

The object to bind to the given name.

Implementation

def []=(name, object)
	@objects[name] = Explicit.new(object)
end

def [](name)

Generate a proxy for a remotely bound object.

This always returns a proxy, even if the object is bound locally. The object bus is not shared between client and server, so [] always returns a proxy to the remote instance.

Signature

parameter name String

The name of the bound object.

returns Proxy

A proxy instance for the bound object.

Implementation

def [](name)
	return proxy_for(name)
end

def bind(name, object)

Explicitly bind an object to a name, such that it could be accessed remotely.

This method is identical to Async::Bus::Protocol::Connection#[]= but also returns a class Async::Bus::Protocol::Proxy instance for the bound object which can be passed by reference.

Explicitly bound objects are not garbage collected until the connection is closed.

Example: Binding an object to a name and accessing it remotely.

array_proxy = connection.bind(:items, [1, 2, 3])
connection[:remote].register(array_proxy)

Signature

parameter name String

The name to bind the object to.

parameter object Object

The object to bind to the given name.

returns Proxy

A proxy instance for the bound object.

Implementation

def bind(name, object)
	# Bind the object into the local object store (explicitly bound, not temporary):
	@objects[name] = Explicit.new(object)
	
	# Always return a proxy for passing by reference, even for locally bound objects:
	return proxy_for(name)
end

def proxy(object)

Implicitly bind an object with a temporary name, such that it could be accessed remotely.

Implicitly bound objects are garbage collected when the remote end no longer references them.

This method is simliar to Async::Bus::Protocol::Connection#bind but is designed to be used to generate temporary proxies for objects that are not explicitly bound.

Signature

parameter object Object

The object to bind to a temporary name.

returns Proxy

A proxy instance for the bound object.

Implementation

def proxy(object)
	name = object.__id__
	
	# Bind the object into the local object store (temporary):
	@objects[name] ||= Implicit.new(object)
	
	# Always return a proxy for passing by reference:
	return proxy_for(name)
end

def proxy_name(object)

Implicitly bind an object with a temporary name, such that it could be accessed remotely.

Implicitly bound objects are garbage collected when the remote end no longer references them.

This method is similar to Async::Bus::Protocol::Connection#proxy but is designed to be used to generate temporary names for objects that are not explicitly bound during serialization.

Signature

parameter object Object

The object to bind to a temporary name.

returns String

The name of the bound object.

Implementation

def proxy_name(object)
	name = object.__id__
	
	# Bind the object into the local object store (temporary):
	@objects[name] ||= Implicit.new(object)
	
	# Return the name:
	return name
end

def proxy_object(name)

Get an object or proxy for a bound object, handling reverse lookup.

If the object is bound locally and the proxy is for this connection, returns the actual object. If the object is bound remotely, or the proxy is from a different connection, returns a proxy. This is used when deserializing proxies to handle round-trip scenarios and avoid name collisions.

Signature

parameter name String

The name of the bound object.

parameter local Boolean

Whether the proxy is for this connection (from serialization). Defaults to true.

returns Object | Proxy

The object if bound locally and proxy is for this connection, or a proxy otherwise.

Implementation

def proxy_object(name)
	# If the proxy is for this connection and the object is bound locally, return the actual object:
	if entry = @objects[name]
		# This handles round-trip scenarios correctly.
		return entry.object
	end
	
	# Otherwise, create a proxy for the remote object:
	return proxy_for(name)
end

def transaction!(id = self.next_id)

Create a new transaction.

Signature

parameter id Integer

The transaction ID.

returns Transaction

A new transaction.

Implementation

def transaction!(id = self.next_id)
	transaction = Transaction.new(self, id, timeout: @timeout)
	@transactions[id] = transaction
	
	return transaction
end

def invoke(name, arguments, options = {}, &block)

Invoke a remote procedure.

Signature

parameter name Symbol

The name of the remote object.

parameter arguments Array

The arguments to pass.

parameter options Hash

The keyword arguments to pass.

yields {|*args| ...}

Optional block for yielding operations.

returns Object

The result of the invocation.

Implementation

def invoke(name, arguments, options = {}, &block)
	transaction = self.transaction!
	
	transaction.invoke(name, arguments, options, &block)
ensure
	transaction&.close
end

def send_release(name)

Send a release message for a named object.

Signature

parameter name Symbol

The name of the object to release.

Implementation

def send_release(name)
	self.write(Release.new(name))
end

def run(parent: Task.current)

Run the connection message loop.

Signature

parameter parent Async::Task

The parent task to run under.

Implementation

def run(parent: Task.current)
	finalizer_task = parent.async do
		while name = @finalized.pop
			self.send_release(name)
		end
	end
	
	@unpacker.each do |message|
		case message
		when Invoke
			# If the object is not found, send an error response and skip the transaction:
			if object = @objects[message.name]&.object
				transaction = self.transaction!(message.id)
				
				parent.async(annotation: "Invoke #{message.name}") do
					# $stderr.puts "-> Accepting: #{message.name} #{message.arguments.inspect} #{message.options.inspect}"
					transaction.accept(object, message.arguments, message.options, message.block_given)
				ensure
					# $stderr.puts "<- Accepted: #{message.name}"
					# This will also delete the transaction from @transactions:
					transaction.close
				end
			else
				self.write(Error.new(message.id, NameError.new("Object not found: #{message.name}")))
			end
		when Response
			if transaction = @transactions[message.id]
				transaction.push(message)
			else
				# Stale message - transaction already closed (e.g. timeout) or never existed (ignore silently).
			end
		when Release
			name = message.name
			if @objects[name]&.temporary?
				# Only delete temporary objects, not explicitly bound ones:
				@objects.delete(name)
			end
		else
			Console.error(self, "Unexpected message:", message)
		end
	end
ensure
	finalizer_task&.stop
	
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@transactions.clear
	@proxies = ::ObjectSpace::WeakMap.new
end