Async::Service::SupervisorSourceAsyncServiceSupervisorSupervisorController

class SupervisorController

Controller for supervisor operations.

Handles registration of workers, worker lookup, restarting process groups, and status queries.

Definitions

def initialize(server, connection)

Initialize the supervisor controller with the server and connection.

Implementation

def initialize(server, connection)
	@server = server
	@connection = connection
	
	@id = nil
	@process_id = nil
	@worker = nil
	@state = {}
end

attr :server

Signature

attribute Server

The server instance.

attr :connection

Signature

attribute Connection

The connection instance.

attr :id

Signature

attribute Integer

The ID assigned to this worker.

attr :process_id

Signature

attribute Integer

The process ID of the worker.

attr :worker

Signature

attribute Proxy

The proxy to the worker controller.

attr_accessor :state

Signature

attribute Hash

State associated with this worker connection (e.g., service name).

def register(worker, process_id:, state: {})

Register a worker connection with the supervisor.

Allocates a unique sequential ID, stores the worker controller proxy, and notifies all monitors of the new connection.

Signature

parameter worker Proxy

The proxy to the worker controller.

parameter process_id Integer

The process ID of the worker.

parameter state Hash

Optional state to associate with this worker (e.g., service name).

returns Integer

The connection ID assigned to the worker.

Implementation

def register(worker, process_id:, state: {})
	raise RuntimeError, "Already registered" if @id
	
	@id = @server.next_id
	@process_id = process_id
	@worker = worker
	@state.merge!(state)
	
	@server.add(self)
	
	return @id
end

def [](id)

Get a worker controller proxy by connection ID.

Returns a proxy to the worker controller that can be used to invoke operations directly on the worker. The proxy uses multi-hop forwarding to route calls through the supervisor to the worker.

Example: Accessing a worker

supervisor = connection[:supervisor]
worker = supervisor[id]
worker.memory_dump(path: "/tmp/dump.json")

Signature

parameter id Integer

The ID of the worker.

returns Proxy

A proxy to the worker controller.

raises ArgumentError

If the connection ID is not found.

Implementation

def [](id)
	unless id
		raise ArgumentError, "Missing 'id' parameter"
	end
	
	supervisor_controller = @server.controllers[id]
	
	unless supervisor_controller
		raise ArgumentError, "Connection not found: #{id}"
	end
	
	worker = supervisor_controller.worker
	
	unless worker
		raise ArgumentError, "Worker controller not found for connection: #{id}"
	end
	
	return worker
end

def keys

List all registered worker IDs.

Signature

returns Array(Integer)

An array of IDs for all registered workers.

Implementation

def keys
	@server.controllers.keys
end

def restart(signal: :INT)

Restart the current process group, usually including the supervisor and any other processes.

Signature

parameter signal Symbol

The signal to send to the process group.

Implementation

def restart(signal: :INT)
	# We are going to terminate the process group, including *this* process
	::Process.kill(signal, ::Process.ppid)
end

def status

Query the status of the supervisor and all connected workers.

Returns an array of status information from each monitor. Each monitor provides its own status representation.

Signature

returns Array

An array of status information from each monitor.

Implementation

def status
	@server.monitors.map do |monitor|
		begin
			monitor.status
		rescue => error
			error
		end
	end.compact
end