Async::Service::SupervisorSourceAsyncServiceSupervisorUtilizationMonitor

class UtilizationMonitor

Monitors worker utilization metrics aggregated by service name.

Uses shared memory to efficiently collect utilization metrics from workers and aggregates them by service name for monitoring and reporting.

Nested

Definitions

def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)

Initialize a new utilization monitor.

Signature

parameter path String

Path to the shared memory file.

parameter interval Integer

Interval in seconds to aggregate and update metrics.

parameter size Integer

Total size of the shared memory buffer.

parameter segment_size Integer

Size of each allocation segment (default: 512 bytes).

Implementation

def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
	@path = path
	@interval = interval
	@segment_size = segment_size
	
	@allocator = SegmentAllocator.new(path, size: size, segment_size: segment_size)
	
	# Track workers: worker_id => supervisor_controller
	@workers = {}
	
	@guard = Mutex.new
end

def register(supervisor_controller)

Register a worker with the utilization monitor.

Allocates a segment of shared memory and instructs the worker to map the shared memory file and expose utilization information at the allocated offset. The worker maps the file independently and returns its schema.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def register(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		# Allocate a segment first (we'll get schema from worker)
		offset = @allocator.allocate(worker_id, [])
		
		unless offset
			Console.warn(self, "Failed to allocate utilization segment", worker_id: worker_id)
			return
		end
		
		# Inform worker of the shared memory path, size, and allocated offset
		# The worker will map the file itself and return its schema
		begin
			worker = supervisor_controller.worker
			
			if worker
				# Pass the segment size - observer will handle page alignment and file mapping
				schema = worker.setup_utilization_observer(@path, @segment_size, offset)
				
				# Update the allocation with the actual schema
				if schema && !schema.empty?
					@allocator.update_schema(worker_id, schema)
					@workers[worker_id] = supervisor_controller
					
					Console.info(self, "Registered worker utilization", worker_id: worker_id, offset: offset, schema: schema)
				else
					# Worker didn't provide schema, free the allocation
					@allocator.free(worker_id)
					Console.info(self, "Worker did not provide utilization schema", worker_id: worker_id)
				end
			end
		rescue => error
			Console.error(self, "Error setting up worker utilization", worker_id: worker_id, exception: error)
			@allocator.free(worker_id)
		end
	end
end

def remove(supervisor_controller)

Remove a worker from the utilization monitor.

Returns the allocated segment back to the free list.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def remove(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		@workers.delete(worker_id)
		@allocator.free(worker_id)
		
		Console.debug(self, "Freed utilization segment", worker_id: worker_id)
	end
end

def self.monitor_type

The key used when this monitor's status is aggregated with others.

Implementation

def self.monitor_type
	:utilization_monitor
end

def as_json

Serialize utilization data for JSON.

Signature

returns Hash

Hash mapping service names to aggregated utilization metrics.

Implementation

def as_json
	@guard.synchronize do
		aggregated = {}
		
		@workers.each do |worker_id, supervisor_controller|
			service_name = supervisor_controller.state[:name] || "unknown"
			
			data = @allocator.read(worker_id)
			next unless data
			
			# Initialize service aggregation if needed
			aggregated[service_name] ||= {}
			
			# Sum up all numeric fields
			data.each do |key, value|
				if value.is_a?(Numeric)
					aggregated[service_name][key] ||= 0
					aggregated[service_name][key] += value
				else
					# For non-numeric values, we could handle differently
					# For now, just store the last value
					aggregated[service_name][key] = value
				end
			end
			
			# Count workers per service (for utilization denominator)
			aggregated[service_name][:worker_count] = (aggregated[service_name][:worker_count] || 0) + 1
		end
		
		aggregated
	end
end

def to_json(...)

Serialize to JSON string.

Implementation

def to_json(...)
	as_json.to_json(...)
end

def status

Get aggregated utilization status by service name.

Reads utilization data from all registered workers and aggregates it by service name (from supervisor_controller.state[:name]).

Signature

returns Hash

Hash with type and data keys.

Implementation

def status
	{type: self.class.monitor_type, data: as_json}
end

def emit(metrics)

Emit the utilization metrics.

Signature

parameter status Hash

The utilization metrics.

Implementation

def emit(metrics)
	Console.info(self, "Utilization:", metrics: metrics)
end

def run

Run the utilization monitor.

Periodically aggregates utilization data from all workers.

Signature

returns Async::Task

The task that is running the utilization monitor.

Implementation

def run
	Async do
		Loop.run(interval: @interval) do
			self.emit(self.as_json)
		end
	end
end