Async::SafeSourceAsyncSafeMonitor

class Monitor

The core monitoring implementation using TracePoint.

This class tracks object ownership across fibers, detecting when an object is accessed from a different fiber than the one that originally created or accessed it.

The monitor uses a TracePoint on :call events to track all method calls, and maintains a registry of which fiber "owns" each object. Uses weak references to avoid preventing garbage collection of tracked objects.

Definitions

def initialize

Initialize a new monitor instance.

Implementation

def initialize
	@owners = ObjectSpace::WeakMap.new
	@mutex = Thread::Mutex.new
	@trace_point = nil
end

def enable!

Enable the monitor by activating the TracePoint.

Implementation

def enable!
	@trace_point ||= TracePoint.trace(:call, &method(:check_access))
end

def disable!

Disable the monitor by deactivating the TracePoint.

Implementation

def disable!
	if trace_point = @trace_point
		@trace_point = nil
		trace_point.disable
	end
end

def transfer(*objects)

Explicitly transfer ownership of objects to the current fiber.

Also recursively transfers ownership of any tracked instance variables and objects contained in collections (Array, Hash, Set).

Signature

parameter objects Array(Object)

The objects to transfer.

Implementation

def transfer(*objects)
	current = Fiber.current
	visited = Set.new
	
	# Disable tracking during traversal to avoid deadlock:
	current.async_safe_transfer = true
	
	begin
		# Traverse object graph:
		objects.each do |object|
			traverse_objects(object, visited)
		end
		
		# Transfer all visited objects:
		@mutex.synchronize do
			visited.each do |object|
				@owners[object] = current if @owners.key?(object)
			end
		end
	ensure
		current.async_safe_transfer = false
	end
end

def check_access(trace_point)

Check if the current access is allowed or constitutes a violation.

Signature

parameter trace_point TracePoint

The trace point containing access information.

Implementation

def check_access(trace_point)
	# Skip if we're in a transfer operation:
	return if Fiber.current.async_safe_transfer
	
	object = trace_point.self
	
	# Skip tracking class/module methods:
	return if object.is_a?(Module)
	
	# Skip frozen objects:
	return if object.frozen?
	
	method = trace_point.method_id
	klass = trace_point.defined_class
	
	# Check the object's actual class:
	klass = object.class
	
	# Check if the class or method is marked as async-safe:
	if klass.async_safe?(method)
		return
	end
	
	# Track ownership:
	current = Fiber.current
	
	@mutex.synchronize do
		if owner = @owners[object]
			# Violation if accessed from different fiber:
			if owner != current
				raise ViolationError.new(
					target: object,
					method: method,
					owner: owner,
					current: current,
				)
			end
		else
			# First access - record owner:
			@owners[object] = current
		end
	end
end