Async::BusSourceAsyncBusProtocolWrapper

class Wrapper

Represents a MessagePack factory wrapper for async-bus serialization.

Definitions

def initialize(connection, reference_types: [Controller])

Initialize a new wrapper.

Signature

parameter connection Connection

The connection for proxy resolution.

parameter reference_types Array(Class)

Types to serialize as proxies.

Implementation

def initialize(connection, reference_types: [Controller])
	super()
	
	@connection = connection
	@reference_types = reference_types
	
	# Store the peer connection for forwarding proxies:
	# When a proxy is forwarded (local=false), it should point back to the sender
	# (the peer connection), not the receiver (this connection).
	@peer_connection = nil
	
	# The order here matters.
	
	self.register_type(0x00, Invoke, recursive: true,
			packer: ->(invoke, packer){invoke.pack(packer)},
			unpacker: ->(unpacker){Invoke.unpack(unpacker)},
		)
	
	[Return, Yield, Error, Next, Throw, Close].each_with_index do |klass, index|
		self.register_type(0x01 + index, klass, recursive: true,
				packer: ->(value, packer){value.pack(packer)},
				unpacker: ->(unpacker){klass.unpack(unpacker)},
			)
	end
	
	# Reverse serialize proxies back into proxies:
	# When a Proxy is received, use proxy_object to handle reverse lookup
	self.register_type(0x10, Proxy, recursive: true,
			packer: self.method(:pack_proxy),
			unpacker: self.method(:unpack_proxy),
		)
	
	self.register_type(0x11, Release, recursive: true,
			packer: ->(release, packer){release.pack(packer)},
			unpacker: ->(unpacker){Release.unpack(unpacker)},
		)
	
	self.register_type(0x20, Symbol)
	self.register_type(0x21, Exception, recursive: true,
			packer: self.method(:pack_exception),
			unpacker: self.method(:unpack_exception),
		)
	
	self.register_type(0x22, Class,
			packer: ->(klass){klass.name},
			unpacker: ->(name){Object.const_get(name)},
		)
	
	reference_packer = self.method(:pack_reference)
	reference_unpacker = self.method(:unpack_reference)
	
	# Serialize objects into proxies:
	reference_types&.each_with_index do |klass, index|
		self.register_type(0x30 + index, klass, recursive: true,
				packer: reference_packer,
				unpacker: reference_unpacker,
			)
	end
end

def pack_proxy(proxy, packer)

Pack a proxy into a MessagePack packer.

Validates that the proxy is for this connection and serializes the proxy name. Multi-hop proxy forwarding is not supported, so proxies can only be serialized from the same connection they were created for (round-trip scenarios).

Signature

parameter proxy Proxy

The proxy to serialize.

parameter packer MessagePack::Packer

The packer to write to.

raises ArgumentError

If the proxy is from a different connection (multi-hop forwarding not supported).

Implementation

def pack_proxy(proxy, packer)
	# Check if the proxy is for this connection:
	if proxy.__connection__ != @connection
		proxy = @connection.proxy(proxy)
	end
	
	packer.write(proxy.__name__)
end

def unpack_proxy(unpacker)

Unpack a proxy from a MessagePack unpacker.

When deserializing a proxy:

  • If the object is bound locally, return the actual object (round-trip scenario)
  • If the object is not found locally, create a proxy pointing to this connection (the proxy was forwarded from another connection and should point back to the sender)

Signature

parameter unpacker MessagePack::Unpacker

The unpacker to read from.

returns Object | Proxy

The actual object if bound locally, or a proxy pointing to this connection.

Implementation

def unpack_proxy(unpacker)
	@connection.proxy_object(unpacker.read)
end

def pack_exception(exception, packer)

Pack an exception into a MessagePack packer.

Signature

parameter exception Exception

The exception to pack.

parameter packer MessagePack::Packer

The packer to write to.

Implementation

def pack_exception(exception, packer)
	packer.write(exception.class.name)
	packer.write(exception.message)
	packer.write(exception.backtrace)
end

def unpack_exception(unpacker)

Unpack an exception from a MessagePack unpacker.

Signature

parameter unpacker MessagePack::Unpacker

The unpacker to read from.

returns Exception

A reconstructed exception.

Implementation

def unpack_exception(unpacker)
	klass = unpacker.read
	message = unpacker.read
	backtrace = unpacker.read
	
	klass = Object.const_get(klass)
	
	exception = klass.new(message)
	exception.set_backtrace(backtrace)
	
	return exception
end

def pack_reference(object, packer)

Pack a reference type object (e.g., Controller) into a MessagePack packer.

Serializes the object as a proxy by generating a temporary name and writing it to the packer. The object is implicitly bound to the connection with a temporary name.

Signature

parameter object Object

The reference type object to serialize.

parameter packer MessagePack::Packer

The packer to write to.

Implementation

def pack_reference(object, packer)
	packer.write(@connection.proxy_name(object))
end

def unpack_reference(unpacker)

Unpack a reference type object from a MessagePack unpacker.

Reads a proxy name and returns the corresponding object or proxy. If the object is bound locally, returns the actual object; otherwise returns a proxy.

Signature

parameter unpacker MessagePack::Unpacker

The unpacker to read from.

returns Object | Proxy

The actual object if bound locally, or a proxy otherwise.

Implementation

def unpack_reference(unpacker)
	@connection.proxy_object(unpacker.read)
end