Async::BusSourceAsyncBusProtocolTransaction

class Transaction

Represents a transaction for a remote procedure call.

Definitions

def initialize(connection, id, timeout: nil)

Initialize a new transaction.

Signature

parameter connection Connection

The connection for this transaction.

parameter id Integer

The transaction ID.

parameter timeout Float

The timeout for the transaction.

Implementation

def initialize(connection, id, timeout: nil)
	@connection = connection
	@id = id
	
	@timeout = timeout
	
	@received = Thread::Queue.new
	@accept = nil
end

attr :connection

Signature

attribute Connection

The connection for this transaction.

attr :id

Signature

attribute Integer

The transaction ID.

attr_accessor :timeout

Signature

attribute Float

The timeout for the transaction.

attr :received

Signature

attribute Thread::Queue

The queue of received messages.

attr :accept

Signature

attribute Object

The accept handler.

def accept(object, arguments, options, block_given)

Accept a remote procedure invocation.

Signature

parameter object Object

The object to invoke the method on.

parameter arguments Array

The positional arguments.

parameter options Hash

The keyword arguments.

parameter block_given Boolean

Whether a block was provided.

Implementation

def accept(object, arguments, options, block_given)
	if block_given
		result = object.public_send(*arguments, **options) do |*yield_arguments|
			self.write(Yield.new(@id, yield_arguments))
			
			response = self.read
			
			case response
			when Next
				response.result
			when Error
				raise(response.result)
			when Close
				break
			end
		end
	else
		result = object.public_send(*arguments, **options)
	end
	
	self.write(Return.new(@id, result))
rescue UncaughtThrowError => error
	# UncaughtThrowError has both tag and value attributes
	# Store both in the Throw message: result is tag, we'll add value handling
	self.write(Throw.new(@id, [error.tag, error.value]))
rescue => error
	self.write(Error.new(@id, error))
end

def read

Read a message from the transaction queue.

Signature

returns Object

The next message.

Implementation

def read
	if @received.empty?
		@connection.flush
	end
	
	@received.pop(timeout: @timeout)
end

def write(message)

Write a message to the connection.

Signature

parameter message Object

The message to write.

raises RuntimeError

If the transaction is closed.

Implementation

def write(message)
	if @connection
		@connection.write(message)
	else
		raise RuntimeError, "Transaction is closed!"
	end
end

def push(message)

Push a message to the transaction's received queue. Silently ignores messages if the queue is already closed.

Signature

parameter message Object

The message to push.

Implementation

def push(message)
	@received.push(message)
rescue ClosedQueueError
	# Queue is closed (transaction already finished/closed) - ignore silently.
end

def close

Close the transaction and clean up resources.

Implementation

def close
	if connection = @connection
		@connection = nil
		@received.close
		
		connection.transactions.delete(@id)
	end
end

def invoke(name, arguments, options, &block)

Invoke a remote procedure.

Signature

parameter name Symbol

The name of the remote object.

parameter arguments Array

The positional arguments.

parameter options Hash

The keyword arguments.

yields {|*args| ...}

Optional block for yielding operations.

returns Object

The result of the invocation.

Implementation

def invoke(name, arguments, options, &block)
	Console.debug(self){[name, arguments, options, block]}
	
	self.write(Invoke.new(@id, name, arguments, options, block_given?))
	
	while response = self.read
		case response
		when Return
			return response.result
		when Yield
			begin
				result = yield(*response.result)
				self.write(Next.new(@id, result))
			rescue => error
				self.write(Error.new(@id, error))
			end
		when Error
			raise(response.result)
		when Throw
			# Re-throw the tag and value that was thrown on the server side
			# Throw.result contains [tag, value] array
			tag, value = response.result
			throw(tag, value)
		end
	end
end