Async::Container::SupervisorSourceAsyncContainerSupervisorConnection

class Connection

Represents a bidirectional communication channel between supervisor and worker.

Handles message passing, call/response patterns, and connection lifecycle.

Nested

Definitions

def initialize(stream, id = 0, **state)

Initialize a new connection.

Signature

parameter stream IO

The underlying IO stream.

parameter id Integer

The starting call ID (default: 0).

parameter state Hash

Initial connection state.

Implementation

def initialize(stream, id = 0, **state)
	@stream = stream
	@id = id
	@state = state
	
	@reader = nil
	@calls = {}
end

attr :calls

Signature

attribute Hash(Integer, Call)

Calls in progress.

attr_accessor :state

Signature

attribute Hash(Symbol, Object)

State associated with this connection, for example the process ID, etc.

def next_id

Generate the next unique call ID.

Signature

returns Integer

The next call identifier.

Implementation

def next_id
	@id += 2
end

def write(**message)

Write a message to the connection stream.

Signature

parameter message Hash

The message to write.

Implementation

def write(**message)
	raise IOError, "Connection is closed!" unless @stream
	@stream&.write(JSON.dump(message) << "\n")
	@stream&.flush # it is possible for @stream to become nil after the write call
end

def read

Read a message from the connection stream.

Signature

returns Hash, nil

The parsed message or nil if stream is closed.

Implementation

def read
	if line = @stream&.gets
		JSON.parse(line, symbolize_names: true)
	end
end

def each

Iterate over all messages from the connection.

Signature

yields {|message| ...}

Each message read from the stream.

Implementation

def each
	while message = self.read
		yield message
	end
end

def call(...)

Make a synchronous call and wait for a single response.

Implementation

def call(...)
	Call.call(self, ...)
end

def run(target)

Run the connection, processing incoming messages.

Dispatches incoming calls to the target and routes responses to waiting calls.

Signature

parameter target Dispatchable

The target to dispatch calls to.

Implementation

def run(target)
	# Process incoming messages from the connection:
	self.each do |message|
		# If the message has an ID, it is a response to a call:
		if id = message.delete(:id)
			# Find the call in the connection's calls hash:
			if call = @calls[id]
				# Enqueue the response for the call:
				call.push(**message)
			elsif message.key?(:do)
				# Otherwise, if we couldn't find an existing call, it must be a new call:
				Call.dispatch(self, target, id, message)
			else
				# Finally, if none of the above, it is likely a response to a timed-out call, so ignore it:
				Console.debug(self, "Ignoring message:", message)
			end
		else
			Console.error(self, "Unknown message:", message)
		end
	end
end

def run_in_background(target, parent: Task.current)

Run the connection in a background task.

Signature

parameter target Dispatchable

The target to dispatch calls to.

parameter parent Async::Task

The parent task.

returns Async::Task

The background reader task.

Implementation

def run_in_background(target, parent: Task.current)
	@reader ||= parent.async do
		self.run(target)
	end
end

def close

Close the connection and clean up resources.

Stops the background reader, closes the stream, and closes all pending calls.

Implementation

def close
	if @reader
		@reader.stop
		@reader = nil
	end
	
	if stream = @stream
		@stream = nil
		stream.close
	end
	
	if @calls
		@calls.each do |id, call|
			call.close
		end
		
		@calls.clear
	end
end