Async::Service::SupervisorSourceAsyncServiceSupervisorUtilizationMonitor

class UtilizationMonitor

Monitors worker utilization metrics aggregated by service name.

Uses shared memory to efficiently collect utilization metrics from workers and aggregates them by service name for monitoring and reporting.

Nested

Definitions

def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)

Initialize a new utilization monitor.

Signature

parameter path String

Path to the shared memory file.

parameter interval Integer

Interval in seconds to aggregate and update metrics.

parameter size Integer

Total size of the shared memory buffer.

parameter segment_size Integer

Size of each allocation segment (default: 512 bytes).

Implementation

def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
	super(interval: interval)
	@path = path
	@segment_size = segment_size
	
	@allocator = SegmentAllocator.new(path, size: size, segment_size: segment_size)
	
	# Track workers: worker_id => supervisor_controller
	@workers = {}
	
	@guard = Mutex.new
end

def register(supervisor_controller)

Register a worker with the utilization monitor.

Allocates a segment of shared memory and instructs the worker to map the shared memory file and expose utilization information at the allocated offset. The worker maps the file independently and returns its schema.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def register(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		# Allocate a segment first (we'll get schema from worker)
		offset = @allocator.allocate(worker_id, [])
		
		unless offset
			Console.warn(self, "Failed to allocate utilization segment", worker_id: worker_id)
			return
		end
		
		# Inform worker of the shared memory path, size, and allocated offset
		# The worker will map the file itself and return its schema
		begin
			worker = supervisor_controller.worker
			
			if worker
				# Pass the segment size - observer will handle page alignment and file mapping
				schema = worker.setup_utilization_observer(@path, @segment_size, offset)
				
				# Update the allocation with the actual schema
				if schema && !schema.empty?
					@allocator.update_schema(worker_id, schema)
					@workers[worker_id] = supervisor_controller
					
					Console.info(self, "Registered worker utilization", worker_id: worker_id, offset: offset, schema: schema)
				else
					# Worker didn't provide schema, free the allocation
					@allocator.free(worker_id)
					Console.info(self, "Worker did not provide utilization schema", worker_id: worker_id)
				end
			end
		rescue => error
			Console.error(self, "Error setting up worker utilization", worker_id: worker_id, exception: error)
			@allocator.free(worker_id)
		end
	end
end

def remove(supervisor_controller)

Remove a worker from the utilization monitor.

Returns the allocated segment back to the free list.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def remove(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		@workers.delete(worker_id)
		@allocator.free(worker_id)
		
		Console.debug(self, "Freed utilization segment", worker_id: worker_id)
	end
end

def self.monitor_type

The key used when this monitor's status is aggregated with others.

Implementation

def self.monitor_type
	:utilization_monitor
end

def as_json

Serialize utilization data for JSON.

Signature

returns Hash

Hash mapping service names to aggregated utilization metrics.

Implementation

def as_json
	@guard.synchronize do
		aggregated = {}
		
		@workers.each do |worker_id, supervisor_controller|
			service_name = supervisor_controller.state[:name] || "unknown"
			
			data = @allocator.read(worker_id)
			next unless data
			
			# Initialize service aggregation if needed
			aggregated[service_name] ||= {}
			
			# Sum up all numeric fields
			data.each do |key, value|
				if value.is_a?(Numeric)
					aggregated[service_name][key] ||= 0
					aggregated[service_name][key] += value
				else
					# For non-numeric values, we could handle differently
					# For now, just store the last value
					aggregated[service_name][key] = value
				end
			end
			
			# Count workers per service (for utilization denominator)
			aggregated[service_name][:worker_count] = (aggregated[service_name][:worker_count] || 0) + 1
		end
		
		aggregated
	end
end

def emit(metrics)

Emit the utilization metrics.

Signature

parameter status Hash

The utilization metrics.

Implementation

def emit(metrics)
	Console.info(self, "Utilization:", metrics: metrics)
end

def run_once

Run one iteration of the utilization monitor.

Implementation

def run_once
	self.emit(self.as_json)
end