Async::UtilizationSourceAsyncUtilizationRegistry

class Registry

Registry for emitting utilization metrics.

The registry tracks values directly and notifies a registered observer when values change. The observer (like Observer) can write to its backend.

Registries should be explicitly created and passed to components that need them. In service contexts, registries are typically created via the evaluator and shared across components within the same service instance.

When an observer is added, it is immediately notified of all current values so it can sync its state. When values change, the observer is notified.

Example: Create a registry and emit metrics:

registry = Async::Utilization::Registry.new

# Emit metrics - values tracked in registry
registry.increment(:total_requests)
registry.track(:active_requests) do
	# Handle request - auto-decrements when block completes
end

# Add shared memory observer when supervisor connects
# Observer will be notified of all current values automatically
schema = Async::Utilization::Schema.build(
	total_requests: :u64,
	active_requests: :u32
)
observer = Async::Utilization::Observer.open(schema, "/path/to/shm", 4096, 0)
registry.observer = observer

Signature

Definitions

def initialize

Initialize a new registry.

Implementation

def initialize
	@observer = nil
	@metrics = {}
	
	@guard = Mutex.new
end

attr :observer

Signature

attribute Object | Nil

The registered observer.

attr :guard

Signature

attribute Mutex

The mutex for thread safety.

def values

Get the current values for all metrics.

Signature

returns Hash

Hash mapping field names to their current values.

Implementation

def values
	@metrics.transform_values do |metric|
		metric.guard.synchronize{metric.value}
	end
end

def observer=(observer)

Set the observer for the registry.

When an observer is set, all cached metrics are updated so they write directly to the observer's buffer. The observer must expose schema and buffer attributes.

Signature

parameter observer Observer | Nil

The observer to set.

Implementation

def observer=(observer)
	@guard.synchronize do
		@observer = observer
		
		# Invalidate all cached metrics with new observer (or nil)
		@metrics.each_value do |metric|
			metric.observer = observer
		end
		
		# Console.info(self, "Observer assigned", observer: observer, metric_count: @metrics.size)
	end
	
end

def set(field, value)

Set a field value.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to set.

parameter value Numeric

The value to set.

Implementation

def set(field, value)
	metric(field).set(value)
end

def increment(field)

Increment a field value.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to increment.

returns Integer

The new value of the field.

Implementation

def increment(field)
	metric(field).increment
end

def track(field, &block)

Track an operation: increment before the block, decrement after it completes.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to track.

returns Object

The block's return value.

Implementation

def track(field, &block)
	metric(field).track(&block)
end

def decrement(field)

Decrement a field value.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to decrement.

returns Integer

The new value of the field.

Implementation

def decrement(field)
	metric(field).decrement
end

def metric(field)

Get a cached metric reference for a field.

Returns a class Async::Utilization::Metric instance that caches all details needed for fast writes. Metrics are cached per field and invalidated when the observer changes.

Signature

parameter field Symbol

The field name to get a metric for.

returns Metric

A metric instance for the given field.

Implementation

def metric(field)
	field = field.to_sym
	
	@guard.synchronize do
		@metrics[field] ||= Metric.for(field, @observer)
	end
end