Async::Service::SupervisorSourceAsyncServiceSupervisorMemoryMonitor

class MemoryMonitor

Monitors worker memory usage and restarts workers that exceed limits.

Uses the memory gem to track process memory and detect leaks.

Definitions

def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)

Create a new memory monitor.

Signature

parameter interval Integer

The interval at which to check for memory leaks.

parameter total_size_limit Integer

The total size limit of all processes, or nil for no limit.

parameter free_size_minimum Integer

The minimum free memory threshold, or nil for no threshold.

parameter options Hash

Options to pass to the cluster when adding processes.

Implementation

def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
	@interval = interval
	@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum)
	
	# We use these options when adding processes to the cluster:
	@options = options
	
	@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
	
	# Queue to serialize cluster modifications to prevent race conditions:
	@guard = Mutex.new
end

attr_reader :cluster

Signature

attribute Memory::Leak::Cluster

The cluster of processes being monitored.

def add(process_id)

Add a process to the memory monitor. You may override this to control how processes are added to the cluster.

Signature

parameter process_id Integer

The process ID to add.

Implementation

def add(process_id)
	@cluster.add(process_id, **@options)
end

def register(supervisor_controller)

Register a worker with the memory monitor.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def register(supervisor_controller)
	process_id = supervisor_controller.process_id
	return unless process_id
	
	Console.debug(self, "Registering worker.", supervisor_controller: supervisor_controller, process_id: process_id)
	
	@guard.synchronize do
		controllers = @processes[process_id]
		
		if controllers.empty?
			Console.debug(self, "Registering process.", child: {process_id: process_id})
			self.add(process_id)
		end
		
		controllers.add(supervisor_controller)
	end
end

def remove(supervisor_controller)

Remove a worker from the memory monitor.

Signature

parameter supervisor_controller SupervisorController

The supervisor controller for the worker.

Implementation

def remove(supervisor_controller)
	process_id = supervisor_controller.process_id
	return unless process_id
	
	@guard.synchronize do
		controllers = @processes[process_id]
		
		controllers.delete(supervisor_controller)
		
		if controllers.empty?
			Console.debug(self, "Removing process.", child: {process_id: process_id})
			@cluster.remove(process_id)
			@processes.delete(process_id)
		end
	end
end

def status

Get status for the memory monitor.

Signature

returns Hash

Status including the memory cluster.

Implementation

def status
	{memory_monitor: @cluster.as_json}
end

def memory_leak_detected(process_id, monitor)

Invoked when a memory leak is detected.

Signature

parameter process_id Integer

The process ID of the process that has a memory leak.

parameter monitor Memory::Leak::Monitor

The monitor that detected the memory leak.

returns Boolean

True if the process was killed.

Implementation

def memory_leak_detected(process_id, monitor)
	Console.warn(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
	
	# Kill the process gently:
	begin
		Console.info(self, "Killing process!", child: {process_id: process_id})
		Process.kill(:INT, process_id)
	rescue Errno::ESRCH
		# No such process - he's dead Jim.
	rescue => error
		Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
	end
	
	true
end

def run

Run the memory monitor.

Signature

returns Async::Task

The task that is running the memory monitor.

Implementation

def run
	Async do
		Loop.run(interval: @interval) do
			@guard.synchronize do
				# This block must return true if the process was killed.
				@cluster.check! do |process_id, monitor|
					begin
						memory_leak_detected(process_id, monitor)
					rescue => error
						Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
					end
				end
			end
		end
	end
end