class SupervisorController
Controller for supervisor operations.
Handles registration of workers, worker lookup, restarting process groups, and status queries.
Definitions
def initialize(server, connection)
Initialize the supervisor controller with the server and connection.
Implementation
def initialize(server, connection)
@server = server
@connection = connection
@id = nil
@process_id = nil
@worker = nil
@state = {}
end
attr :server
Signature
-
attribute
Server The server instance.
attr :connection
Signature
-
attribute
Connection The connection instance.
attr :id
Signature
-
attribute
Integer The ID assigned to this worker.
attr :process_id
Signature
-
attribute
Integer The process ID of the worker.
attr :worker
Signature
-
attribute
Proxy The proxy to the worker controller.
attr_accessor :state
Signature
-
attribute
Hash State associated with this worker connection (e.g., service name).
def register(worker, process_id:, state: {})
Register a worker connection with the supervisor.
Allocates a unique sequential ID, stores the worker controller proxy, and notifies all monitors of the new connection.
Signature
-
parameter
workerProxy The proxy to the worker controller.
-
parameter
process_idInteger The process ID of the worker.
-
parameter
stateHash Optional state to associate with this worker (e.g., service name).
-
returns
Integer The connection ID assigned to the worker.
Implementation
def register(worker, process_id:, state: {})
raise RuntimeError, "Already registered" if @id
@id = @server.next_id
@process_id = process_id
@worker = worker
@state.merge!(state)
@server.add(self)
return @id
end
def [](id)
Get a worker controller proxy by connection ID.
Returns a proxy to the worker controller that can be used to invoke operations directly on the worker. The proxy uses multi-hop forwarding to route calls through the supervisor to the worker.
Example: Accessing a worker
supervisor = connection[:supervisor]
worker = supervisor[id]
worker.memory_dump(path: "/tmp/dump.json")
Signature
-
parameter
idInteger The ID of the worker.
-
returns
Proxy A proxy to the worker controller.
-
raises
ArgumentError If the connection ID is not found.
Implementation
def [](id)
unless id
raise ArgumentError, "Missing 'id' parameter"
end
supervisor_controller = @server.controllers[id]
unless supervisor_controller
raise ArgumentError, "Connection not found: #{id}"
end
worker = supervisor_controller.worker
unless worker
raise ArgumentError, "Worker controller not found for connection: #{id}"
end
return worker
end
def keys
List all registered worker IDs.
Signature
-
returns
Array(Integer) An array of IDs for all registered workers.
Implementation
def keys
@server.controllers.keys
end
def restart(signal: :INT)
Restart the current process group, usually including the supervisor and any other processes.
Signature
-
parameter
signalSymbol The signal to send to the process group.
Implementation
def restart(signal: :INT)
# We are going to terminate the process group, including *this* process
::Process.kill(signal, ::Process.ppid)
end
def status
Query the status of the supervisor and all connected workers.
Returns an array of status information from each monitor. Each monitor provides its own status representation.
Signature
-
returns
Array An array of status information from each monitor.
Implementation
def status
@server.monitors.map do |monitor|
begin
monitor.status
rescue => error
error
end
end.compact
end