class Server
The server represents the main supervisor process which is responsible for managing the lifecycle of other processes.
There are various tasks that can be executed by the server, such as restarting the process group, and querying the status of the processes. The server is also responsible for managing the lifecycle of the monitors, which can be used to monitor the status of the connected workers.
Definitions
def initialize(monitors: [], endpoint: Supervisor.endpoint)
Initialize a new supervisor server.
Signature
-
parameter
monitorsArray The monitors to run.
-
parameter
endpointIO::Endpoint The endpoint to listen on.
Implementation
def initialize(monitors: [], endpoint: Supervisor.endpoint)
@monitors = monitors
@endpoint = endpoint
@connections = {}
end
def do_register(call)
Register a worker connection with the supervisor.
Assigns a unique connection ID and notifies all monitors of the new connection.
Signature
-
parameter
callConnection::Call The registration call.
-
parameter
call[:state]Hash The worker state to merge (e.g. process_id).
Implementation
def do_register(call)
if state = call.message[:state]
call.connection.state.merge!(state)
end
connection_id = SecureRandom.uuid
call.connection.state[:connection_id] = connection_id
@connections[connection_id] = call.connection
@monitors.each do |monitor|
monitor.register(call.connection)
rescue => error
Console.error(self, "Error while registering process!", monitor: monitor, exception: error)
end
ensure
call.finish(connection_id: connection_id)
end
def do_forward(call)
Forward an operation to a worker connection.
This allows clients to invoke operations on specific worker processes by providing a connection_id. The operation is proxied through to the worker and responses are streamed back to the client.
Signature
-
parameter
callConnection::Call The call to handle.
-
parameter
call[:operation]Hash The operation to forward, must include :do key.
-
parameter
call[:connection_id]String The connection ID to target.
Implementation
def do_forward(call)
operation = call[:operation]
connection_id = call[:connection_id]
unless connection_id
call.fail(error: "Missing 'connection_id' parameter")
return
end
connection = @connections[connection_id]
unless connection
call.fail(error: "Connection not found", connection_id: connection_id)
return
end
# Forward the call to the target connection
call.forward(connection, operation)
end
def do_restart(call)
Restart the current process group, usually including the supervisor and any other processes.
Signature
-
parameter
signalSymbol The signal to send to the process group.
Implementation
def do_restart(call)
signal = call[:signal] || :INT
# We are going to terminate the progress group, including *this* process, so finish the current RPC before that:
call.finish
::Process.kill(signal, ::Process.ppid)
end
def do_status(call)
Query the status of the supervisor and all connected workers.
Returns information about all registered connections and delegates to monitors to provide additional status information.
Signature
-
parameter
callConnection::Call The status call.
Implementation
def do_status(call)
connections = @connections.map do |connection_id, connection|
{
connection_id: connection_id,
process_id: connection.state[:process_id],
state: connection.state,
}
end
@monitors.each do |monitor|
monitor.status(call)
end
call.finish(connections: connections)
end
def remove(connection)
Remove a worker connection from the supervisor.
Notifies all monitors and removes the connection from tracking.
Signature
-
parameter
connectionConnection The connection to remove.
Implementation
def remove(connection)
if connection_id = connection.state[:connection_id]
@connections.delete(connection_id)
end
@monitors.each do |monitor|
monitor.remove(connection)
rescue => error
Console.error(self, "Error while removing process!", monitor: monitor, exception: error)
end
end
def run(parent: Task.current)
Run the supervisor server.
Starts all monitors and accepts connections from workers.
Signature
-
parameter
parentAsync::Task The parent task to run under.
Implementation
def run(parent: Task.current)
parent.async do |task|
@monitors.each do |monitor|
monitor.run
rescue => error
Console.error(self, "Error while starting monitor!", monitor: monitor, exception: error)
end
@endpoint.accept do |peer|
connection = Connection.new(peer, 1)
connection.run(self)
ensure
connection.close
remove(connection)
end
task.children&.each(&:wait)
ensure
task.stop
end
end