Async::Container::SupervisorSourceAsyncContainerSupervisorMemoryMonitor

class MemoryMonitor

Definitions

def initialize(interval: 10, total_size_limit: nil, **options)

Create a new memory monitor.

Signature

parameter interval Integer

The interval at which to check for memory leaks.

parameter total_size_limit Integer

The total size limit of all processes, or nil for no limit.

parameter options Hash

Options to pass to the cluster when adding processes.

Implementation

def initialize(interval: 10, total_size_limit: nil, **options)
	@interval = interval
	@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit)
	
	# We use these options when adding processes to the cluster:
	@options = options
	
	@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
end

def add(process_id)

Add a process to the memory monitor. You may override this to control how processes are added to the cluster.

Signature

parameter process_id Integer

The process ID to add.

Implementation

def add(process_id)
	@cluster.add(process_id, **@options)
end

def register(connection)

Register the connection (worker) with the memory monitor.

Implementation

def register(connection)
	Console.debug(self, "Registering connection:", connection: connection, state: connection.state)
	if process_id = connection.state[:process_id]
		connections = @processes[process_id]
		
		if connections.empty?
			Console.debug(self, "Registering process:", process_id: process_id)
			self.add(process_id)
		end
		
		connections.add(connection)
	end
end

def remove(connection)

Remove the connection (worker) from the memory monitor.

Implementation

def remove(connection)
	if process_id = connection.state[:process_id]
		connections = @processes[process_id]
		
		connections.delete(connection)
		
		if connections.empty?
			Console.debug(self, "Removing process:", process_id: process_id)
			@cluster.remove(process_id)
		end
	end
end

def status(call)

Dump the current status of the memory monitor.

Signature

parameter call Connection::Call

The call to respond to.

Implementation

def status(call)
	call.push(memory_monitor: @cluster)
end

def memory_leak_detected(process_id, monitor)

Invoked when a memory leak is detected.

Signature

parameter process_id Integer

The process ID of the process that has a memory leak.

parameter monitor Memory::Leak::Monitor

The monitor that detected the memory leak.

returns Boolean

True if the process was killed.

Implementation

def memory_leak_detected(process_id, monitor)
	Console.info(self, "Killing process:", process_id: process_id)
	Process.kill(:INT, process_id)
	
	true
end

def run

Run the memory monitor.

Signature

returns Async::Task

The task that is running the memory monitor.

Implementation

def run
	Async do
		while true
			# This block must return true if the process was killed.
			@cluster.check! do |process_id, monitor|
				Console.error(self, "Memory leak detected in process:", process_id: process_id, monitor: monitor)
				memory_leak_detected(process_id, monitor)
			end
			
			sleep(@interval)
		end
	end
end