Async::Service::SupervisorSourceAsyncServiceSupervisorUtilizationMonitorSegmentAllocator

class SegmentAllocator

Allocates and manages shared memory segments for worker utilization data.

Manages a shared memory file that workers can write utilization metrics to. Allocates segments to workers and maintains a free list for reuse. Each process (supervisor and workers) maps the shared memory file independently.

Definitions

def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2)

Initialize a new shared memory manager.

Creates and maps the shared memory file. Workers will map the same file independently using the provided path.

Signature

parameter path String

Path to the shared memory file.

parameter size Integer

Total size of the shared memory buffer.

parameter segment_size Integer

Size of each allocation segment (default: 512 bytes).

parameter growth_factor Integer, Float

Factor to grow by when resizing (default: 2, doubles the size). Can be less than 2 or a floating point value; the result will be page-aligned to an integer.

Implementation

def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2)
	@path = path
	@size = size
	@segment_size = segment_size
	@growth_factor = growth_factor
	
	@file = File.open(path, "w+b")
	@file.truncate(size)
	# Supervisor maps the file for reading worker data
	@buffer = IO::Buffer.map(@file, size)
	
	# Track allocated segments: worker_id => {offset: Integer, schema: Array}
	@allocations = {}
	
	# Free list of segment offsets
	@free_list = []
	
	# Initialize free list with all segments
	(0...(@size / @segment_size)).each do |segment_index|
		@free_list << (segment_index * @segment_size)
	end
end

def allocate(worker_id, schema)

Allocate a segment for a worker.

Automatically resizes the shared memory file if no segments are available.

Signature

parameter worker_id Integer

The ID of the worker.

parameter schema Array

Array of [key, type, offset] tuples describing the data layout.

returns Integer

The offset into the shared memory buffer, or nil if allocation fails.

Implementation

def allocate(worker_id, schema)
	# Try to resize if we're out of segments
	if @free_list.empty?
		unless resize(@size * @growth_factor)
			return nil
		end
	end
	
	offset = @free_list.shift
	@allocations[worker_id] = {offset: offset, schema: schema}
	
	return offset
end

def free(worker_id)

Free a segment allocated to a worker.

Signature

parameter worker_id Integer

The ID of the worker.

Implementation

def free(worker_id)
	if allocation = @allocations.delete(worker_id)
		@free_list << allocation[:offset]
	end
end

def allocation(worker_id)

Get the allocation information for a worker.

Signature

parameter worker_id Integer

The ID of the worker.

returns Hash

Allocation info with :offset and :schema, or nil if not allocated.

Implementation

def allocation(worker_id)
	@allocations[worker_id]
end

def size

Get the current size of the shared memory file.

Signature

returns Integer

The current size of the shared memory file.

Implementation

def size
	@size
end

def update_schema(worker_id, schema)

Update the schema for an existing allocation.

Signature

parameter worker_id Integer

The ID of the worker.

parameter schema Array

Array of [key, type, offset] tuples describing the data layout.

Implementation

def update_schema(worker_id, schema)
	if allocation = @allocations[worker_id]
		allocation[:schema] = schema
	end
end

def read(worker_id)

Read utilization data from a worker's allocated segment.

Signature

parameter worker_id Integer

The ID of the worker.

returns Hash

Hash mapping keys to their values, or nil if not allocated.

Implementation

def read(worker_id)
	allocation = @allocations[worker_id]
	return nil unless allocation
	
	offset = allocation[:offset]
	schema = allocation[:schema]
	
	result = {}
	schema.each do |key, type, field_offset|
		absolute_offset = offset + field_offset
		
		# Use IO::Buffer type symbols directly (i32, u32, i64, u64, f32, f64)
		# IO::Buffer accepts both lowercase and uppercase versions
		begin
			result[key] = @buffer.get_value(type, absolute_offset)
		rescue => error
			Console.warn(self, "Failed to read value", type: type, key: key, offset: absolute_offset, exception: error)
		end
	end
	
	return result
end

def resize(new_size)

Resize the shared memory file.

Extends the file to the new size, remaps the buffer, and adds new segments to the free list. The new size must be larger than the current size and should be page-aligned for optimal performance.

Signature

parameter new_size Integer

The new size for the shared memory file.

returns Boolean

True if resize was successful, false otherwise.

Implementation

def resize(new_size)
	old_size = @size
	return false if new_size <= old_size
	
	# Ensure new size is page-aligned (rounds up to nearest page boundary)
	page_size = IO::Buffer::PAGE_SIZE
	new_size = (((new_size + page_size - 1) / page_size) * page_size).to_i
	
	begin
		# Extend the file:
		@file.truncate(new_size)
		
		# Remap the buffer to the new size:
		@buffer&.free
		@buffer = IO::Buffer.map(@file, new_size)
		
		# Calculate new segments to add to free list:
		old_segment_count = old_size / @segment_size
		new_segment_count = new_size / @segment_size
		
		# Add new segments to free list:
		(old_segment_count...new_segment_count).each do |segment_index|
			@free_list << (segment_index * @segment_size)
		end
		
		@size = new_size
		
		Console.info(self, "Resized shared memory", old_size: old_size, new_size: new_size, segments_added: new_segment_count - old_segment_count)
		
		return true
	rescue => error
		Console.error(self, "Failed to resize shared memory", old_size: old_size, new_size: new_size, exception: error)
		return false
	end
end

def close

Close the shared memory file.

Implementation

def close
	@file&.close
	@buffer = nil
end