class Cluster
Detects memory leaks in a cluster of processes.
This class is used to manage a cluster of processes and detect memory leaks in each process. It can also apply a memory limit to the cluster, and terminate processes if the memory limit is exceeded.
Definitions
def initialize(total_size_limit: nil)
Create a new cluster.
Signature
-
parameter
total_size_limit
Numeric | Nil
The total memory limit for the cluster.
Implementation
def initialize(total_size_limit: nil)
@total_size = nil
@total_size_limit = total_size_limit
@processes = {}
end
def as_json(...)
Signature
-
returns
Hash
A serializable representation of the cluster.
Implementation
def as_json(...)
{
total_size: @total_size,
total_size_limit: @total_size_limit,
processes: @processes.transform_values(&:as_json),
}
end
def to_json(...)
Signature
-
returns
String
The JSON representation of the cluster.
Implementation
def to_json(...)
as_json.to_json(...)
end
attr :total_size
Signature
-
attribute
Numeric | Nil
The total size of the cluster.
attr_accessor :total_size_limit
Signature
-
attribute
Numeric | Nil
The total size limit for the cluster, in bytes, if which is exceeded, the cluster will terminate processes.
attr :processes
Signature
-
attribute
Hash(Integer, Monitor)
The process IDs and monitors in the cluster.
def add(process_id, **options)
Add a new process ID to the cluster.
Implementation
def add(process_id, **options)
@processes[process_id] = Monitor.new(process_id, **options)
end
def remove(process_id)
Remove a process ID from the cluster.
Implementation
def remove(process_id)
@processes.delete(process_id)
end
def apply_limit!(total_size_limit = @total_size_limit)
Apply the memory limit to the cluster. If the total memory usage exceeds the limit, yields each process ID and monitor in order of maximum memory usage, so that they could be terminated and/or removed.
Signature
-
yields
{|process_id, monitor| ...}
each process ID and monitor in order of maximum memory usage, return true if it was terminated to adjust memory usage.
Implementation
def apply_limit!(total_size_limit = @total_size_limit)
@total_size = @processes.values.map(&:current_size).sum
if @total_size > total_size_limit
Console.warn(self, "Total memory usage exceeded limit.", total_size: @total_size, total_size_limit: total_size_limit)
else
return false
end
sorted = @processes.sort_by do |process_id, monitor|
-monitor.current_size
end
sorted.each do |process_id, monitor|
if @total_size > total_size_limit
yield(process_id, monitor, @total_size)
# For the sake of the calculation, we assume that the process has been terminated:
@total_size -= monitor.current_size
else
break
end
end
end
def sample!
Sample the memory usage of all processes in the cluster.
Implementation
def sample!
System.memory_usages(@processes.keys) do |process_id, memory_usage|
if monitor = @processes[process_id]
monitor.current_size = memory_usage
end
end
end
def check!(&block)
Check all processes in the cluster for memory leaks.
Signature
-
yields
{|process_id, monitor| ...}
each process ID and monitor that is leaking or exceeds the memory limit.
Implementation
def check!(&block)
return to_enum(__method__) unless block_given?
self.sample!
leaking = []
@processes.each do |process_id, monitor|
if monitor.leaking?
Console.debug(self, "Memory Leak Detected!", process_id: process_id, monitor: monitor)
leaking << [process_id, monitor]
end
end
leaking.each(&block)
# Finally, apply any per-cluster memory limits:
apply_limit!(@total_size_limit, &block) if @total_size_limit
end