Async::UtilizationSourceAsyncUtilizationRegistry

class Registry

Registry for emitting utilization metrics.

The registry tracks values directly and notifies a registered observer when values change. The observer (like Observer) can write to its backend.

Registries should be explicitly created and passed to components that need them. In service contexts, registries are typically created via the evaluator and shared across components within the same service instance.

When an observer is added, it is immediately notified of all current values so it can sync its state. When values change, the observer is notified.

Example: Create a registry and emit metrics:

registry = Async::Utilization::Registry.new

# Emit metrics - values tracked in registry
registry.increment(:total_requests)
registry.increment(:active_requests) do
	# Handle request - auto-decrements when block completes
end

# Add shared memory observer when supervisor connects
# Observer will be notified of all current values automatically
schema = Async::Utilization::Schema.build(
	total_requests: :u64,
	active_requests: :u32
)
observer = Async::Utilization::Observer.open(schema, "/path/to/shm", 4096, 0)
registry.observer = observer

Signature

Definitions

def initialize

Initialize a new registry.

Implementation

def initialize
	@observer = nil
	@metrics = {}
	
	@guard = Mutex.new
end

attr :observer

Signature

attribute Object | Nil

The registered observer.

attr :guard

Signature

attribute Mutex

The mutex for thread safety.

def values

Get the current values for all metrics.

Signature

returns Hash

Hash mapping field names to their current values.

Implementation

def values
	@metrics.transform_values do |metric|
		metric.guard.synchronize{metric.value}
	end
end

def observer=(observer)

Set the observer for the registry.

When an observer is set, it is notified of all current metric values so it can sync its state. The observer must implement set(field, value). All cached metrics are invalidated when the observer changes.

Signature

parameter observer #set

The observer to set.

Implementation

def observer=(observer)
	@guard.synchronize do
		# Invalidate all cached metrics
		@metrics.each_value do |metric|
			metric.invalidate
		end
		
		@observer = observer
	end
	
	# Notify observer of all current metric values (outside guard to avoid deadlock)
	@metrics.each do |name, metric|
		value = metric.guard.synchronize{metric.value}
		observer.set(name, value)
	end
end

def set(field, value)

Set a field value.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to set.

parameter value Numeric

The value to set.

Implementation

def set(field, value)
	metric(field).set(value)
end

def increment(field, &block)

Increment a field value, optionally with a block that auto-decrements.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to increment.

returns Integer

The new value of the field.

Implementation

def increment(field, &block)
	metric(field).increment(&block)
end

def decrement(field)

Decrement a field value.

Delegates to the metric instance for the given field.

Signature

parameter field Symbol

The field name to decrement.

returns Integer

The new value of the field.

Implementation

def decrement(field)
	metric(field).decrement
end

def metric(field)

Get a cached metric reference for a field.

Returns a class Async::Utilization::Metric instance that caches all details needed for fast writes. Metrics are cached per field and invalidated when the observer changes.

Signature

parameter field Symbol

The field name to get a metric for.

returns Metric

A metric instance for the given field.

Implementation

def metric(field)
	field = field.to_sym
	
	@guard.synchronize do
		@metrics[field] ||= Metric.new(field, self)
	end
end