class Registry
Registry for emitting utilization metrics.
The registry tracks values directly and notifies a registered observer when values change. The observer (like Observer) can write to its backend.
Registries should be explicitly created and passed to components that need them. In service contexts, registries are typically created via the evaluator and shared across components within the same service instance.
When an observer is added, it is immediately notified of all current values so it can sync its state. When values change, the observer is notified.
Example: Create a registry and emit metrics:
registry = Async::Utilization::Registry.new
# Emit metrics - values tracked in registry
registry.increment(:total_requests)
registry.increment(:active_requests) do
# Handle request - auto-decrements when block completes
end
# Add shared memory observer when supervisor connects
# Observer will be notified of all current values automatically
schema = Async::Utilization::Schema.build(
total_requests: :u64,
active_requests: :u32
)
observer = Async::Utilization::Observer.open(schema, "/path/to/shm", 4096, 0)
registry.observer = observer
Signature
Definitions
def initialize
Initialize a new registry.
Implementation
def initialize
@observer = nil
@metrics = {}
@guard = Mutex.new
end
attr :observer
Signature
-
attribute
Object | Nil The registered observer.
attr :guard
Signature
-
attribute
Mutex The mutex for thread safety.
def values
Get the current values for all metrics.
Signature
-
returns
Hash Hash mapping field names to their current values.
Implementation
def values
@metrics.transform_values do |metric|
metric.guard.synchronize{metric.value}
end
end
def observer=(observer)
Set the observer for the registry.
When an observer is set, it is notified of all current metric values
so it can sync its state. The observer must implement set(field, value).
All cached metrics are invalidated when the observer changes.
Signature
-
parameter
observer#set The observer to set.
Implementation
def observer=(observer)
@guard.synchronize do
# Invalidate all cached metrics
@metrics.each_value do |metric|
metric.invalidate
end
@observer = observer
end
# Notify observer of all current metric values (outside guard to avoid deadlock)
@metrics.each do |name, metric|
value = metric.guard.synchronize{metric.value}
observer.set(name, value)
end
end
def set(field, value)
Set a field value.
Delegates to the metric instance for the given field.
Signature
-
parameter
fieldSymbol The field name to set.
-
parameter
valueNumeric The value to set.
Implementation
def set(field, value)
metric(field).set(value)
end
def increment(field, &block)
Increment a field value, optionally with a block that auto-decrements.
Delegates to the metric instance for the given field.
Signature
-
parameter
fieldSymbol The field name to increment.
-
returns
Integer The new value of the field.
Implementation
def increment(field, &block)
metric(field).increment(&block)
end
def decrement(field)
Decrement a field value.
Delegates to the metric instance for the given field.
Signature
-
parameter
fieldSymbol The field name to decrement.
-
returns
Integer The new value of the field.
Implementation
def decrement(field)
metric(field).decrement
end
def metric(field)
Get a cached metric reference for a field.
Returns a class Async::Utilization::Metric instance that caches all details needed for fast writes.
Metrics are cached per field and invalidated when the observer changes.
Signature
-
parameter
fieldSymbol The field name to get a metric for.
-
returns
Metric A metric instance for the given field.
Implementation
def metric(field)
field = field.to_sym
@guard.synchronize do
@metrics[field] ||= Metric.new(field, self)
end
end