class MemoryMonitor
Definitions
def initialize(interval: 10, total_size_limit: nil, **options)
Create a new memory monitor.
Signature
-
parameter
interval
Integer
The interval at which to check for memory leaks.
-
parameter
total_size_limit
Integer
The total size limit of all processes, or nil for no limit.
-
parameter
options
Hash
Options to pass to the cluster when adding processes.
Implementation
def initialize(interval: 10, total_size_limit: nil, **options)
@interval = interval
@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit)
# We use these options when adding processes to the cluster:
@options = options
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
end
def add(process_id)
Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
Signature
-
parameter
process_id
Integer
The process ID to add.
Implementation
def add(process_id)
@cluster.add(process_id, **@options)
end
def register(connection)
Register the connection (worker) with the memory monitor.
Implementation
def register(connection)
Console.debug(self, "Registering connection:", connection: connection, state: connection.state)
if process_id = connection.state[:process_id]
connections = @processes[process_id]
if connections.empty?
Console.debug(self, "Registering process:", process_id: process_id)
self.add(process_id)
end
connections.add(connection)
end
end
def remove(connection)
Remove the connection (worker) from the memory monitor.
Implementation
def remove(connection)
if process_id = connection.state[:process_id]
connections = @processes[process_id]
connections.delete(connection)
if connections.empty?
Console.debug(self, "Removing process:", process_id: process_id)
@cluster.remove(process_id)
end
end
end
def status(call)
Dump the current status of the memory monitor.
Signature
-
parameter
call
Connection::Call
The call to respond to.
Implementation
def status(call)
call.push(memory_monitor: @cluster)
end
def memory_leak_detected(process_id, monitor)
Invoked when a memory leak is detected.
Signature
-
parameter
process_id
Integer
The process ID of the process that has a memory leak.
-
parameter
monitor
Memory::Leak::Monitor
The monitor that detected the memory leak.
-
returns
Boolean
True if the process was killed.
Implementation
def memory_leak_detected(process_id, monitor)
Console.info(self, "Killing process:", process_id: process_id)
Process.kill(:INT, process_id)
true
end
def run
Run the memory monitor.
Signature
-
returns
Async::Task
The task that is running the memory monitor.
Implementation
def run
Async do
while true
# This block must return true if the process was killed.
@cluster.check! do |process_id, monitor|
Console.error(self, "Memory leak detected in process:", process_id: process_id, monitor: monitor)
memory_leak_detected(process_id, monitor)
end
sleep(@interval)
end
end
end