Async::Container::SupervisorSourceAsyncContainerSupervisorConnectionCall

class Call

Represents a remote procedure call over a connection.

Manages the call lifecycle, response queueing, and completion signaling.

Definitions

def initialize(connection, id, message)

Initialize a new call.

Signature

parameter connection Connection

The connection this call belongs to.

parameter id Integer

The unique call identifier.

parameter message Hash

The call message/parameters.

Implementation

def initialize(connection, id, message)
	@connection = connection
	@id = id
	@message = message
	
	@queue = ::Thread::Queue.new
end

def as_json(...)

Convert the call to a JSON-compatible hash.

Signature

returns Hash

The message hash.

Implementation

def as_json(...)
	@message
end

def to_json(...)

Convert the call to a JSON string.

Signature

returns String

The JSON representation.

Implementation

def to_json(...)
	as_json.to_json(...)
end

attr :connection

Signature

attribute Connection

The connection that initiated the call.

attr :message

Signature

attribute Hash

The message that initiated the call.

def [](key)

Access a parameter from the call message.

Signature

parameter key Symbol

The parameter name.

returns Object

The parameter value.

Implementation

def [] key
	@message[key]
end

def push(**response)

Push a response into the call's queue.

Signature

parameter response Hash

The response data to push.

Implementation

def push(**response)
	@queue.push(response)
end

def pop(...)

Pop a response from the call's queue.

Signature

returns Hash, nil

The next response or nil if queue is closed.

Implementation

def pop(...)
	@queue.pop(...)
end

def close

The call was never completed and the connection itself was closed.

Implementation

def close
	@queue.close
end

def each(&block)

Iterate over all responses from the call.

Signature

yields {|response| ...}

Each response from the queue.

Implementation

def each(&block)
	while response = self.pop
		yield response
	end
end

def finish(**response)

Finish the call with a final response.

Closes the response queue after pushing the final response.

Signature

parameter response Hash

The final response data.

Implementation

def finish(**response)
	# If the remote end has already closed the connection, we don't need to send a finished message:
	unless @queue.closed?
		self.push(id: @id, finished: true, **response)
		@queue.close
	end
end

def fail(**response)

Finish the call with a failure response.

Signature

parameter response Hash

The error response data.

Implementation

def fail(**response)
	self.finish(failed: true, **response)
end

def closed?

Check if the call's queue is closed.

Signature

returns Boolean

True if the queue is closed.

Implementation

def closed?
	@queue.closed?
end

def forward(target_connection, operation)

Forward this call to another connection, proxying all responses back.

This provides true streaming forwarding - intermediate responses flow through in real-time rather than being buffered. The forwarding runs asynchronously to avoid blocking the dispatcher.

Signature

parameter target_connection Connection

The connection to forward the call to.

parameter operation Hash

The operation request to forward (must include :do key).

Implementation

def forward(target_connection, operation)
	# Forward the operation in an async task to avoid blocking
	Async do
		# Make the call to the target connection and stream responses back:
		Call.call(target_connection, **operation) do |response|
			# Push each response through our queue:
			self.push(**response)
		end
	ensure
		# Close our queue to signal completion:
		@queue.close
	end
end

def self.dispatch(connection, target, id, message)

Dispatch a call to a target handler.

Creates a call, dispatches it to the target, and streams responses back through the connection.

Signature

parameter connection Connection

The connection to dispatch on.

parameter target Dispatchable

The target handler.

parameter id Integer

The call identifier.

parameter message Hash

The call message.

Implementation

def self.dispatch(connection, target, id, message)
	Async do
		call = self.new(connection, id, message)
		connection.calls[id] = call
		
		target.dispatch(call)
		
		while response = call.pop
			connection.write(id: id, **response)
		end
	ensure
		# If the queue is closed, we don't need to send a finished message.
		unless call.closed?
			connection.write(id: id, finished: true)
		end
		
		connection.calls.delete(id)
	end
end

def self.call(connection, **message, &block)

Make a call on a connection and wait for responses.

If a block is provided, yields each response. Otherwise, buffers intermediate responses and returns the final response.

Signature

parameter connection Connection

The connection to call on.

parameter message Hash

The call message/parameters.

yields {|response| ...}

Each intermediate response if block given.

returns Hash, Array

The final response or array of intermediate responses.

Implementation

def self.call(connection, **message, &block)
	id = connection.next_id
	call = self.new(connection, id, message)
	
	connection.calls[id] = call
	begin
		connection.write(id: id, **message)
		
		if block_given?
			call.each(&block)
		else
			intermediate = nil
			
			while response = call.pop
				if response.delete(:finished)
					if intermediate
						if response.any?
							intermediate << response
						end
						
						return intermediate
					else
						return response
					end
				else
					# Buffer intermediate responses:
					intermediate ||= []
					intermediate << response
				end
			end
		end
	ensure
		connection.calls.delete(id)
	end
end