Async::Service::SupervisorSourceAsyncServiceSupervisorWorker

class Worker

A worker represents a long running process that can be controlled by the supervisor.

There are various tasks that can be executed by the worker, such as dumping memory, threads, and garbage collection profiles.

Definitions

def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)

Run a worker with the given process ID.

Signature

parameter process_id Integer

The process ID to register with the supervisor.

parameter endpoint IO::Endpoint

The supervisor endpoint to connect to.

Implementation

def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)
	self.new(process_id: process_id, endpoint: endpoint).run
end

def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil)

Initialize a new worker.

Signature

parameter process_id Integer

The process ID to register with the supervisor.

parameter endpoint IO::Endpoint

The supervisor endpoint to connect to.

parameter state Hash

Optional state to associate with this worker (e.g., service name).

parameter utilization_schema Hash | Nil

Optional utilization schema definition.

parameter utilization_registry Registry, nil

Optional utilization registry. If nil, a new registry is created.

Implementation

def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil)
	super(endpoint: endpoint)
	
	@id = nil
	@process_id = process_id
	@state = state
	
	@utilization_schema = utilization_schema
	@utilization_registry = utilization_registry || require("async/utilization") && Async::Utilization::Registry.new
end

attr :id

Signature

attribute Integer

The ID assigned by the supervisor.

attr :process_id

Signature

attribute Integer

The process ID of the worker.

attr_accessor :state

Signature

attribute Hash

State associated with this worker (e.g., service name).

attr :utilization_schema

Signature

attribute Hash | Nil

Utilization schema definition.

attr :utilization_registry

Signature

attribute Registry

The utilization registry for this worker.

def setup_utilization_observer(path, size, offset)

Setup utilization observer for this worker.

Maps the shared memory file and configures the utilization registry to write metrics to it. Called by the supervisor (via WorkerController) to inform the worker of the shared memory file path and allocated offset.

Signature

parameter path String

Path to the shared memory file that the worker should map.

parameter size Integer

Size of the shared memory region to map.

parameter offset Integer

Offset into the shared memory buffer allocated for this worker.

returns Array

Array of [key, type, offset] tuples describing the utilization schema. Returns empty array if no utilization schema is configured.

Implementation

def setup_utilization_observer(path, size, offset)
	return [] unless @utilization_schema
	
	schema = Async::Utilization::Schema.build(@utilization_schema)
	observer = Async::Utilization::Observer.open(schema, path, size, offset)
	@utilization_registry.observer = observer
	
	# Pass the schema back to the supervisor so it can be used to aggregate the metrics:
	observer.schema.to_a
end

def make_controller

Create the worker controller for this worker.

Override this method to provide a custom worker controller that exposes additional RPCs to the supervisor.

Signature

returns WorkerController

The worker controller instance.

Implementation

def make_controller
	WorkerController.new(self)
end