class Transaction
Represents a transaction for a remote procedure call.
Definitions
def initialize(connection, id, timeout: nil)
Initialize a new transaction.
Signature
-
parameter
connectionConnection The connection for this transaction.
-
parameter
idInteger The transaction ID.
-
parameter
timeoutFloat The timeout for the transaction.
Implementation
def initialize(connection, id, timeout: nil)
@connection = connection
@id = id
@timeout = timeout
@received = Thread::Queue.new
@accept = nil
end
attr :connection
Signature
-
attribute
Connection The connection for this transaction.
attr :id
Signature
-
attribute
Integer The transaction ID.
attr_accessor :timeout
Signature
-
attribute
Float The timeout for the transaction.
attr :received
Signature
-
attribute
Thread::Queue The queue of received messages.
attr :accept
Signature
-
attribute
Object The accept handler.
def accept(object, arguments, options, block_given)
Accept a remote procedure invocation.
Signature
-
parameter
objectObject The object to invoke the method on.
-
parameter
argumentsArray The positional arguments.
-
parameter
optionsHash The keyword arguments.
-
parameter
block_givenBoolean Whether a block was provided.
Implementation
def accept(object, arguments, options, block_given)
if block_given
result = object.public_send(*arguments, **options) do |*yield_arguments|
self.write(Yield.new(@id, yield_arguments))
response = self.read
case response
when Next
response.result
when Error
raise(response.result)
when Close
break
end
end
else
result = object.public_send(*arguments, **options)
end
self.write(Return.new(@id, result))
rescue UncaughtThrowError => error
# UncaughtThrowError has both tag and value attributes
# Store both in the Throw message: result is tag, we'll add value handling
self.write(Throw.new(@id, [error.tag, error.value]))
rescue => error
self.write(Error.new(@id, error))
end
def read
Read a message from the transaction queue.
Signature
-
returns
Object The next message.
Implementation
def read
if @received.empty?
@connection.flush
end
@received.pop(timeout: @timeout)
end
def write(message)
Write a message to the connection.
Signature
-
parameter
messageObject The message to write.
-
raises
RuntimeError If the transaction is closed.
Implementation
def write(message)
if @connection
@connection.write(message)
else
raise RuntimeError, "Transaction is closed!"
end
end
def push(message)
Push a message to the transaction's received queue. Silently ignores messages if the queue is already closed.
Signature
-
parameter
messageObject The message to push.
Implementation
def push(message)
@received.push(message)
rescue ClosedQueueError
# Queue is closed (transaction already finished/closed) - ignore silently.
end
def close
Close the transaction and clean up resources.
Implementation
def close
if connection = @connection
@connection = nil
@received.close
connection.transactions.delete(@id)
end
end
def invoke(name, arguments, options, &block)
Invoke a remote procedure.
Signature
-
parameter
nameSymbol The name of the remote object.
-
parameter
argumentsArray The positional arguments.
-
parameter
optionsHash The keyword arguments.
-
yields
{|*args| ...} Optional block for yielding operations.
-
returns
Object The result of the invocation.
Implementation
def invoke(name, arguments, options, &block)
Console.debug(self){[name, arguments, options, block]}
self.write(Invoke.new(@id, name, arguments, options, block_given?))
while response = self.read
case response
when Return
return response.result
when Yield
begin
result = yield(*response.result)
self.write(Next.new(@id, result))
rescue => error
self.write(Error.new(@id, error))
end
when Error
raise(response.result)
when Throw
# Re-throw the tag and value that was thrown on the server side
# Throw.result contains [tag, value] array
tag, value = response.result
throw(tag, value)
end
end
end