class Aggregate
Represents an aggregate processor that batches jobs for efficient processing. This processor collects jobs in a buffer and processes them in batches, reducing the overhead of individual job processing and improving throughput.
Definitions
def initialize(delegate, parent: nil)
Initialize a new aggregate processor.
Signature
-
parameter
delegate
Object
The delegate object that will handle job execution.
-
option
parent
Async::Task
The parent task for managing the background processing (defaults to Async::Task.current).
Implementation
def initialize(delegate, parent: nil)
super(delegate)
@task = nil
@ready = Async::Condition.new
@pending = Array.new
@processing = Array.new
end
def flush(jobs)
Process a batch of jobs by delegating each job to the configured delegate.
Signature
-
parameter
jobs
Array
The array of jobs to process.
Implementation
def flush(jobs)
while job = jobs.shift
@delegate.call(job)
end
rescue => error
Console.error(self, "Could not flush #{jobs.size} jobs.", exception: error)
end
def run(task)
Run the background processing loop that continuously processes job batches.
Signature
-
parameter
task
Async::Task
The task that manages the background processing.
Implementation
def run(task)
while true
while @pending.empty?
@ready.wait
end
task.defer_stop do
# Swap the buffers:
@pending, @processing = @processing, @pending
flush(@processing)
end
end
end
def call(job)
Enqueue a job into the pending buffer. Start the background processing task if it is not already running.
Signature
-
parameter
job
Object
The job to be enqueued.
Implementation
def call(job)
@pending << job
start! or @ready.signal
end
def start
Start the processor and the background processing task.
Implementation
def start
super
self.start!
end
def stop
Stop the processor and the background processing task.
Implementation
def stop
@task&.stop
super
end