class MemoryMonitor
Monitors worker memory usage and restarts workers that exceed limits.
Uses the memory gem to track process memory and detect leaks.
Definitions
def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
Create a new memory monitor.
Signature
-
parameter
intervalInteger The interval at which to check for memory leaks.
-
parameter
total_size_limitInteger The total size limit of all processes, or nil for no limit.
-
parameter
free_size_minimumInteger The minimum free memory threshold, or nil for no threshold.
-
parameter
optionsHash Options to pass to the cluster when adding processes.
Implementation
def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
@interval = interval
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum)
# We use these options when adding processes to the cluster:
@options = options
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
# Queue to serialize cluster modifications to prevent race conditions:
@guard = Mutex.new
end
attr_reader :cluster
Signature
-
attribute
Memory::Leak::Cluster The cluster of processes being monitored.
def add(process_id)
Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
Signature
-
parameter
process_idInteger The process ID to add.
Implementation
def add(process_id)
@cluster.add(process_id, **@options)
end
def register(supervisor_controller)
Register a worker with the memory monitor.
Signature
-
parameter
supervisor_controllerSupervisorController The supervisor controller for the worker.
Implementation
def register(supervisor_controller)
process_id = supervisor_controller.process_id
return unless process_id
Console.debug(self, "Registering worker.", supervisor_controller: supervisor_controller, process_id: process_id)
@guard.synchronize do
controllers = @processes[process_id]
if controllers.empty?
Console.debug(self, "Registering process.", child: {process_id: process_id})
self.add(process_id)
end
controllers.add(supervisor_controller)
end
end
def remove(supervisor_controller)
Remove a worker from the memory monitor.
Signature
-
parameter
supervisor_controllerSupervisorController The supervisor controller for the worker.
Implementation
def remove(supervisor_controller)
process_id = supervisor_controller.process_id
return unless process_id
@guard.synchronize do
controllers = @processes[process_id]
controllers.delete(supervisor_controller)
if controllers.empty?
Console.debug(self, "Removing process.", child: {process_id: process_id})
@cluster.remove(process_id)
@processes.delete(process_id)
end
end
end
def status
Get status for the memory monitor.
Signature
-
returns
Hash Status including the memory cluster.
Implementation
def status
{memory_monitor: @cluster.as_json}
end
def memory_leak_detected(process_id, monitor)
Invoked when a memory leak is detected.
Signature
-
parameter
process_idInteger The process ID of the process that has a memory leak.
-
parameter
monitorMemory::Leak::Monitor The monitor that detected the memory leak.
-
returns
Boolean True if the process was killed.
Implementation
def memory_leak_detected(process_id, monitor)
Console.warn(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
# Kill the process gently:
begin
Console.info(self, "Killing process!", child: {process_id: process_id})
Process.kill(:INT, process_id)
rescue Errno::ESRCH
# No such process - he's dead Jim.
rescue => error
Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
end
true
end
def run
Run the memory monitor.
Signature
-
returns
Async::Task The task that is running the memory monitor.
Implementation
def run
Async do
Loop.run(interval: @interval) do
@guard.synchronize do
# This block must return true if the process was killed.
@cluster.check! do |process_id, monitor|
begin
memory_leak_detected(process_id, monitor)
rescue => error
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
end
end
end
end
end
end