Memory::LeakSourceMemoryLeakCluster

class Cluster

Detects memory leaks in a cluster of processes.

This class is used to manage a cluster of processes and detect memory leaks in each process. It can also apply a memory limit to the cluster, and terminate processes if the memory limit is exceeded.

Definitions

def initialize(total_size_limit: nil)

Create a new cluster.

Signature

parameter total_size_limit Numeric | Nil

The total memory limit for the cluster.

Implementation

def initialize(total_size_limit: nil)
	@total_size = nil
	@total_size_limit = total_size_limit
	
	@processes = {}
end

def as_json(...)

Signature

returns Hash

A serializable representation of the cluster.

Implementation

def as_json(...)
	{
		total_size: @total_size,
		total_size_limit: @total_size_limit,
		processes: @processes.transform_values(&:as_json),
	}
end

def to_json(...)

Signature

returns String

The JSON representation of the cluster.

Implementation

def to_json(...)
	as_json.to_json(...)
end

attr :total_size

Signature

attribute Numeric | Nil

The total size of the cluster.

attr_accessor :total_size_limit

Signature

attribute Numeric | Nil

The total size limit for the cluster, in bytes, if which is exceeded, the cluster will terminate processes.

attr :processes

Signature

attribute Hash(Integer, Monitor)

The process IDs and monitors in the cluster.

def add(process_id, **options)

Add a new process ID to the cluster.

Implementation

def add(process_id, **options)
	@processes[process_id] = Monitor.new(process_id, **options)
end

def remove(process_id)

Remove a process ID from the cluster.

Implementation

def remove(process_id)
	@processes.delete(process_id)
end

def apply_limit!(total_size_limit = @total_size_limit)

Apply the memory limit to the cluster. If the total memory usage exceeds the limit, yields each process ID and monitor in order of maximum memory usage, so that they could be terminated and/or removed.

Signature

yields {|process_id, monitor| ...}

each process ID and monitor in order of maximum memory usage, return true if it was terminated to adjust memory usage.

Implementation

def apply_limit!(total_size_limit = @total_size_limit)
	@total_size = @processes.values.map(&:current_size).sum
	
	if @total_size > total_size_limit
		Console.warn(self, "Total memory usage exceeded limit.", total_size: @total_size, total_size_limit: total_size_limit)
	else
		return false
	end
	
	sorted = @processes.sort_by do |process_id, monitor|
		-monitor.current_size
	end
	
	sorted.each do |process_id, monitor|
		if @total_size > total_size_limit
			yield(process_id, monitor, @total_size)
			
			# For the sake of the calculation, we assume that the process has been terminated:
			@total_size -= monitor.current_size
		else
			break
		end
	end
end

def sample!

Sample the memory usage of all processes in the cluster.

Implementation

def sample!
	System.memory_usages(@processes.keys) do |process_id, memory_usage|
		if monitor = @processes[process_id]
			monitor.current_size = memory_usage
		end
	end
end

def check!(&block)

Check all processes in the cluster for memory leaks.

Signature

yields {|process_id, monitor| ...}

each process ID and monitor that is leaking or exceeds the memory limit.

Implementation

def check!(&block)
	return to_enum(__method__) unless block_given?
	
	self.sample!
	
	leaking = []
	
	@processes.each do |process_id, monitor|
		if monitor.leaking?
			Console.debug(self, "Memory Leak Detected!", process_id: process_id, monitor: monitor)
			
			leaking << [process_id, monitor]
		end
	end
	
	leaking.each(&block)
	
	# Finally, apply any per-cluster memory limits:
	apply_limit!(@total_size_limit, &block) if @total_size_limit
end