Async::Container::SupervisorSourceAsyncContainerSupervisorConnection

class Connection

Represents a bidirectional communication channel between supervisor and worker.

Handles message passing, call/response patterns, and connection lifecycle.

Nested

Definitions

def initialize(stream, id = 0, **state)

Initialize a new connection.

Signature

parameter stream IO

The underlying IO stream.

parameter id Integer

The starting call ID (default: 0).

parameter state Hash

Initial connection state.

Implementation

def initialize(stream, id = 0, **state)
	@stream = stream
	@id = id
	@state = state
	
	@reader = nil
	@calls = {}
end

attr :calls

Signature

attribute Hash(Integer, Call)

Calls in progress.

attr_accessor :state

Signature

attribute Hash(Symbol, Object)

State associated with this connection, for example the process ID, etc.

def next_id

Generate the next unique call ID.

Signature

returns Integer

The next call identifier.

Implementation

def next_id
	@id += 2
end

def write(**message)

Write a message to the connection stream.

Signature

parameter message Hash

The message to write.

Implementation

def write(**message)
	@stream.write(JSON.dump(message) << "\n")
	@stream.flush
end

def call(timeout: nil, **message)

Make a synchronous call and wait for a single response.

Signature

parameter timeout Numeric, nil

Optional timeout for the call.

parameter message Hash

The call message.

returns Hash

The response.

Implementation

def call(timeout: nil, **message)
	id = next_id
	calls[id] = ::Thread::Queue.new
	
	write(id: id, **message)
	
	return calls[id].pop(timeout: timeout)
ensure
	calls.delete(id)
end

def call(...)

Make a synchronous call and wait for a single response.

Implementation

def call(...)
	Call.call(self, ...)
end

def read

Read a message from the connection stream.

Signature

returns Hash, nil

The parsed message or nil if stream is closed.

Implementation

def read
	if line = @stream&.gets
		JSON.parse(line, symbolize_names: true)
	end
end

def each

Iterate over all messages from the connection.

Signature

yields {|message| ...}

Each message read from the stream.

Implementation

def each
	while message = self.read
		yield message
	end
end

def run(target)

Run the connection, processing incoming messages.

Dispatches incoming calls to the target and routes responses to waiting calls.

Signature

parameter target Dispatchable

The target to dispatch calls to.

Implementation

def run(target)
	self.each do |message|
		if id = message.delete(:id)
			if call = @calls[id]
				# Response to a call:
				call.push(**message)
			elsif message.key?(:do)
				# Incoming call:
				Call.dispatch(self, target, id, message)
			else
				# Likely a response to a timed-out call, ignore it:
				Console.debug(self, "Ignoring message:", message)
			end
		else
			Console.error(self, "Unknown message:", message)
		end
	end
end

def run_in_background(target, parent: Task.current)

Run the connection in a background task.

Signature

parameter target Dispatchable

The target to dispatch calls to.

parameter parent Async::Task

The parent task.

returns Async::Task

The background reader task.

Implementation

def run_in_background(target, parent: Task.current)
	@reader ||= parent.async do
		self.run(target)
	end
end

def close

Close the connection and clean up resources.

Stops the background reader, closes the stream, and closes all pending calls.

Implementation

def close
	if @reader
		@reader.stop
		@reader = nil
	end
	
	if stream = @stream
		@stream = nil
		stream.close
	end
	
	if @calls
		@calls.each do |id, call|
			call.close
		end
		
		@calls.clear
	end
end