Async::JobSourceAsyncJobProcessorAggregate

class Aggregate

Represents an aggregate processor that batches jobs for efficient processing. This processor collects jobs in a buffer and processes them in batches, reducing the overhead of individual job processing and improving throughput.

Definitions

def initialize(delegate, parent: nil)

Initialize a new aggregate processor.

Signature

parameter delegate Object

The delegate object that will handle job execution.

option parent Async::Task

The parent task for managing the background processing (defaults to Async::Task.current).

Implementation

def initialize(delegate, parent: nil)
	super(delegate)
	
	@task = nil
	@ready = Async::Condition.new
	
	@pending = Array.new
	@processing = Array.new
end

def flush(jobs)

Process a batch of jobs by delegating each job to the configured delegate.

Signature

parameter jobs Array

The array of jobs to process.

Implementation

def flush(jobs)
	while job = jobs.shift
		@delegate.call(job)
	end
rescue => error
	Console.error(self, "Could not flush #{jobs.size} jobs.", exception: error)
end

def run(task)

Run the background processing loop that continuously processes job batches.

Signature

parameter task Async::Task

The task that manages the background processing.

Implementation

def run(task)
	while true
		while @pending.empty?
			@ready.wait
		end
		
		task.defer_stop do
			# Swap the buffers:
			@pending, @processing = @processing, @pending
			
			flush(@processing)
		end
	end
end

def call(job)

Enqueue a job into the pending buffer. Start the background processing task if it is not already running.

Signature

parameter job Object

The job to be enqueued.

Implementation

def call(job)
	@pending << job
	
	start! or @ready.signal
end

def start

Start the processor and the background processing task.

Implementation

def start
	super
	
	self.start!
end

def stop

Stop the processor and the background processing task.

Implementation

def stop
	@task&.stop
	
	super
end