class Supervisor
Implements a host supervisor which can restart the host services and provide various metrics about the running processes.
Definitions
def initialize(...)
Initialize the supervisor using the given environment.
Signature
-
parameter
environment
Build::Environment
Implementation
def initialize(...)
super
@bound_endpoint = nil
end
def endpoint
The endpoint which the supervisor will bind to. Typically a unix pipe in the same directory as the host.
Implementation
def endpoint
@evaluator.endpoint
end
def do_restart(message)
Restart the process group that the supervisor belongs to.
Implementation
def do_restart(message)
# Tell the parent of this process group to spin up a new process group/container.
# Wait for that to start accepting new connections.
# Stop accepting connections.
# Wait for existing connnections to drain.
# Terminate this process group.
signal = message[:signal] || :INT
Process.kill(signal, Process.ppid)
end
def do_metrics(message)
Capture process metrics relating to the process group that the supervisor belongs to.
Implementation
def do_metrics(message)
Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end
def handle(message)
Handle an incoming request.
Signature
-
parameter
message
Hash
The decoded message.
Implementation
def handle(message)
case message[:please]
when "restart"
self.do_restart(message)
when "metrics"
self.do_metrics(message)
end
end
def start
Bind the supervisor to the specified endpoint.
Implementation
def start
Console.logger.info(self) {"Binding to #{self.endpoint}..."}
@bound_endpoint = Sync{self.endpoint.bound}
super
end
def setup(container)
Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages.
Signature
-
parameter
container
Async::Container::Generic
Implementation
def setup(container)
container.run(name: self.name, restart: true, count: 1) do |instance|
Async do
@bound_endpoint.accept do |peer|
stream = ::IO::Stream(peer)
while message = stream.read_until("\0")
response = handle(JSON.parse(message, symbolize_names: true))
stream.puts(response.to_json, separator: "\0")
end
end
instance.ready!
end
end
super
end
def stop
Release the bound endpoint.
Implementation
def stop
@bound_endpoint&.close
@bound_endpoint = nil
super
end