class Worker
A worker represents a long running process that can be controlled by the supervisor.
There are various tasks that can be executed by the worker, such as dumping memory, threads, and garbage collection profiles.
Definitions
def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)
Run a worker with the given process ID.
Signature
-
parameter
process_idInteger The process ID to register with the supervisor.
-
parameter
endpointIO::Endpoint The supervisor endpoint to connect to.
Implementation
def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)
self.new(process_id: process_id, endpoint: endpoint).run
end
def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil)
Initialize a new worker.
Signature
-
parameter
process_idInteger The process ID to register with the supervisor.
-
parameter
endpointIO::Endpoint The supervisor endpoint to connect to.
-
parameter
stateHash Optional state to associate with this worker (e.g., service name).
-
parameter
utilization_schemaHash | Nil Optional utilization schema definition.
-
parameter
utilization_registryRegistry, nil Optional utilization registry. If nil, a new registry is created.
Implementation
def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil)
super(endpoint: endpoint)
@id = nil
@process_id = process_id
@state = state
@utilization_schema = utilization_schema
@utilization_registry = utilization_registry || require("async/utilization") && Async::Utilization::Registry.new
end
attr :id
Signature
-
attribute
Integer The ID assigned by the supervisor.
attr :process_id
Signature
-
attribute
Integer The process ID of the worker.
attr_accessor :state
Signature
-
attribute
Hash State associated with this worker (e.g., service name).
attr :utilization_schema
Signature
-
attribute
Hash | Nil Utilization schema definition.
attr :utilization_registry
Signature
-
attribute
Registry The utilization registry for this worker.
def setup_utilization_observer(path, size, offset)
Setup utilization observer for this worker.
Maps the shared memory file and configures the utilization registry to write metrics to it. Called by the supervisor (via WorkerController) to inform the worker of the shared memory file path and allocated offset.
Signature
-
parameter
pathString Path to the shared memory file that the worker should map.
-
parameter
sizeInteger Size of the shared memory region to map.
-
parameter
offsetInteger Offset into the shared memory buffer allocated for this worker.
-
returns
Array Array of [key, type, offset] tuples describing the utilization schema. Returns empty array if no utilization schema is configured.
Implementation
def setup_utilization_observer(path, size, offset)
return [] unless @utilization_schema
schema = Async::Utilization::Schema.build(@utilization_schema)
observer = Async::Utilization::Observer.open(schema, path, size, offset)
@utilization_registry.observer = observer
# Pass the schema back to the supervisor so it can be used to aggregate the metrics:
observer.schema.to_a
end
def make_controller
Create the worker controller for this worker.
Override this method to provide a custom worker controller that exposes additional RPCs to the supervisor.
Signature
-
returns
WorkerController The worker controller instance.
Implementation
def make_controller
WorkerController.new(self)
end