class UtilizationMonitor
Monitors worker utilization metrics aggregated by service name.
Uses shared memory to efficiently collect utilization metrics from workers and aggregates them by service name for monitoring and reporting.
Nested
Definitions
def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
Initialize a new utilization monitor.
Signature
-
parameter
pathString Path to the shared memory file.
-
parameter
intervalInteger Interval in seconds to aggregate and update metrics.
-
parameter
sizeInteger Total size of the shared memory buffer.
-
parameter
segment_sizeInteger Size of each allocation segment (default: 512 bytes).
Implementation
def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
@path = path
@interval = interval
@segment_size = segment_size
@allocator = SegmentAllocator.new(path, size: size, segment_size: segment_size)
# Track workers: worker_id => supervisor_controller
@workers = {}
@guard = Mutex.new
end
def register(supervisor_controller)
Register a worker with the utilization monitor.
Allocates a segment of shared memory and instructs the worker to map the shared memory file and expose utilization information at the allocated offset. The worker maps the file independently and returns its schema.
Signature
-
parameter
supervisor_controllerSupervisorController The supervisor controller for the worker.
Implementation
def register(supervisor_controller)
@guard.synchronize do
worker_id = supervisor_controller.id
return unless worker_id
# Allocate a segment first (we'll get schema from worker)
offset = @allocator.allocate(worker_id, [])
unless offset
Console.warn(self, "Failed to allocate utilization segment", worker_id: worker_id)
return
end
# Inform worker of the shared memory path, size, and allocated offset
# The worker will map the file itself and return its schema
begin
worker = supervisor_controller.worker
if worker
# Pass the segment size - observer will handle page alignment and file mapping
schema = worker.setup_utilization_observer(@path, @segment_size, offset)
# Update the allocation with the actual schema
if schema && !schema.empty?
@allocator.update_schema(worker_id, schema)
@workers[worker_id] = supervisor_controller
Console.info(self, "Registered worker utilization", worker_id: worker_id, offset: offset, schema: schema)
else
# Worker didn't provide schema, free the allocation
@allocator.free(worker_id)
Console.info(self, "Worker did not provide utilization schema", worker_id: worker_id)
end
end
rescue => error
Console.error(self, "Error setting up worker utilization", worker_id: worker_id, exception: error)
@allocator.free(worker_id)
end
end
end
def remove(supervisor_controller)
Remove a worker from the utilization monitor.
Returns the allocated segment back to the free list.
Signature
-
parameter
supervisor_controllerSupervisorController The supervisor controller for the worker.
Implementation
def remove(supervisor_controller)
@guard.synchronize do
worker_id = supervisor_controller.id
return unless worker_id
@workers.delete(worker_id)
@allocator.free(worker_id)
Console.debug(self, "Freed utilization segment", worker_id: worker_id)
end
end
def self.monitor_type
The key used when this monitor's status is aggregated with others.
Implementation
def self.monitor_type
:utilization_monitor
end
def as_json
Serialize utilization data for JSON.
Signature
-
returns
Hash Hash mapping service names to aggregated utilization metrics.
Implementation
def as_json
@guard.synchronize do
aggregated = {}
@workers.each do |worker_id, supervisor_controller|
service_name = supervisor_controller.state[:name] || "unknown"
data = @allocator.read(worker_id)
next unless data
# Initialize service aggregation if needed
aggregated[service_name] ||= {}
# Sum up all numeric fields
data.each do |key, value|
if value.is_a?(Numeric)
aggregated[service_name][key] ||= 0
aggregated[service_name][key] += value
else
# For non-numeric values, we could handle differently
# For now, just store the last value
aggregated[service_name][key] = value
end
end
# Count workers per service (for utilization denominator)
aggregated[service_name][:worker_count] = (aggregated[service_name][:worker_count] || 0) + 1
end
aggregated
end
end
def to_json(...)
Serialize to JSON string.
Implementation
def to_json(...)
as_json.to_json(...)
end
def status
Get aggregated utilization status by service name.
Reads utilization data from all registered workers and aggregates it by service name (from supervisor_controller.state[:name]).
Signature
-
returns
Hash Hash with type and data keys.
Implementation
def status
{type: self.class.monitor_type, data: as_json}
end
def emit(metrics)
Emit the utilization metrics.
Signature
-
parameter
statusHash The utilization metrics.
Implementation
def emit(metrics)
Console.info(self, "Utilization:", metrics: metrics)
end
def run
Run the utilization monitor.
Periodically aggregates utilization data from all workers.
Signature
-
returns
Async::Task The task that is running the utilization monitor.
Implementation
def run
Async do
Loop.run(interval: @interval) do
self.emit(self.as_json)
end
end
end