class SegmentAllocator
Allocates and manages shared memory segments for worker utilization data.
Manages a shared memory file that workers can write utilization metrics to. Allocates segments to workers and maintains a free list for reuse. Each process (supervisor and workers) maps the shared memory file independently.
Definitions
def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2)
Initialize a new shared memory manager.
Creates and maps the shared memory file. Workers will map the same file independently using the provided path.
Signature
-
parameter
pathString Path to the shared memory file.
-
parameter
sizeInteger Total size of the shared memory buffer.
-
parameter
segment_sizeInteger Size of each allocation segment (default: 512 bytes).
-
parameter
growth_factorInteger, Float Factor to grow by when resizing (default: 2, doubles the size). Can be less than 2 or a floating point value; the result will be page-aligned to an integer.
Implementation
def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2)
@path = path
@size = size
@segment_size = segment_size
@growth_factor = growth_factor
@file = File.open(path, "w+b")
@file.truncate(size)
# Supervisor maps the file for reading worker data
@buffer = IO::Buffer.map(@file, size)
# Track allocated segments: worker_id => {offset: Integer, schema: Array}
@allocations = {}
# Free list of segment offsets
@free_list = []
# Initialize free list with all segments
(0...(@size / @segment_size)).each do |segment_index|
@free_list << (segment_index * @segment_size)
end
end
def allocate(worker_id, schema)
Allocate a segment for a worker.
Automatically resizes the shared memory file if no segments are available.
Signature
-
parameter
worker_idInteger The ID of the worker.
-
parameter
schemaArray Array of [key, type, offset] tuples describing the data layout.
-
returns
Integer The offset into the shared memory buffer, or nil if allocation fails.
Implementation
def allocate(worker_id, schema)
# Try to resize if we're out of segments
if @free_list.empty?
unless resize(@size * @growth_factor)
return nil
end
end
offset = @free_list.shift
@allocations[worker_id] = {offset: offset, schema: schema}
return offset
end
def free(worker_id)
Free a segment allocated to a worker.
Signature
-
parameter
worker_idInteger The ID of the worker.
Implementation
def free(worker_id)
if allocation = @allocations.delete(worker_id)
@free_list << allocation[:offset]
end
end
def allocation(worker_id)
Get the allocation information for a worker.
Signature
-
parameter
worker_idInteger The ID of the worker.
-
returns
Hash Allocation info with :offset and :schema, or nil if not allocated.
Implementation
def allocation(worker_id)
@allocations[worker_id]
end
def size
Get the current size of the shared memory file.
Signature
-
returns
Integer The current size of the shared memory file.
Implementation
def size
@size
end
def update_schema(worker_id, schema)
Update the schema for an existing allocation.
Signature
-
parameter
worker_idInteger The ID of the worker.
-
parameter
schemaArray Array of [key, type, offset] tuples describing the data layout.
Implementation
def update_schema(worker_id, schema)
if allocation = @allocations[worker_id]
allocation[:schema] = schema
end
end
def read(worker_id)
Read utilization data from a worker's allocated segment.
Signature
-
parameter
worker_idInteger The ID of the worker.
-
returns
Hash Hash mapping keys to their values, or nil if not allocated.
Implementation
def read(worker_id)
allocation = @allocations[worker_id]
return nil unless allocation
offset = allocation[:offset]
schema = allocation[:schema]
result = {}
schema.each do |key, type, field_offset|
absolute_offset = offset + field_offset
# Use IO::Buffer type symbols directly (i32, u32, i64, u64, f32, f64)
# IO::Buffer accepts both lowercase and uppercase versions
begin
result[key] = @buffer.get_value(type, absolute_offset)
rescue => error
Console.warn(self, "Failed to read value", type: type, key: key, offset: absolute_offset, exception: error)
end
end
return result
end
def resize(new_size)
Resize the shared memory file.
Extends the file to the new size, remaps the buffer, and adds new segments to the free list. The new size must be larger than the current size and should be page-aligned for optimal performance.
Signature
-
parameter
new_sizeInteger The new size for the shared memory file.
-
returns
Boolean True if resize was successful, false otherwise.
Implementation
def resize(new_size)
old_size = @size
return false if new_size <= old_size
# Ensure new size is page-aligned (rounds up to nearest page boundary)
page_size = IO::Buffer::PAGE_SIZE
new_size = (((new_size + page_size - 1) / page_size) * page_size).to_i
begin
# Extend the file:
@file.truncate(new_size)
# Remap the buffer to the new size:
@buffer&.free
@buffer = IO::Buffer.map(@file, new_size)
# Calculate new segments to add to free list:
old_segment_count = old_size / @segment_size
new_segment_count = new_size / @segment_size
# Add new segments to free list:
(old_segment_count...new_segment_count).each do |segment_index|
@free_list << (segment_index * @segment_size)
end
@size = new_size
Console.info(self, "Resized shared memory", old_size: old_size, new_size: new_size, segments_added: new_segment_count - old_segment_count)
return true
rescue => error
Console.error(self, "Failed to resize shared memory", old_size: old_size, new_size: new_size, exception: error)
return false
end
end
def close
Close the shared memory file.
Implementation
def close
@file&.close
@buffer = nil
end