class Server
Redis-backed job processor server. Manages job queues using Redis for distributed job processing across multiple workers. Handles immediate jobs, delayed jobs, and job retry/recovery mechanisms.
Definitions
def initialize(delegate, client, prefix: "async-job", coder: Coder::DEFAULT, resolution: 10, parent: nil)
Initialize a new Redis job processor server.
Signature
-
parameter
delegate
Object
The delegate object that will process jobs.
-
parameter
client
Async::Redis::Client
The Redis client instance.
-
parameter
prefix
String
The Redis key prefix for job data.
-
parameter
coder
Async::Job::Coder
The job serialization codec.
-
parameter
resolution
Integer
The resolution in seconds for delayed job processing.
-
parameter
parent
Async::Task
The parent task for background processing.
Implementation
def initialize(delegate, client, prefix: "async-job", coder: Coder::DEFAULT, resolution: 10, parent: nil)
super(delegate)
@id = SecureRandom.uuid
@client = client
@prefix = prefix
@coder = coder
@resolution = resolution
@job_store = JobStore.new(@client, "#{@prefix}:jobs")
@delayed_jobs = DelayedJobs.new(@client, "#{@prefix}:delayed")
@ready_list = ReadyList.new(@client, "#{@prefix}:ready")
@processing_list = ProcessingList.new(@client, "#{@prefix}:processing", @id, @ready_list, @job_store)
@parent = parent || Async::Idler.new
end
def start!
Start the job processing loop immediately.
Signature
-
returns
Async::Task | false
The processing task or false if already started.
Implementation
def start!
return false if @task
@task = true
@parent.async(transient: true, annotation: self.class.name) do |task|
@task = task
while true
self.dequeue(task)
end
ensure
@task = nil
end
end
def start
Start the server and all background processing tasks. Initializes delayed job processing, abandoned job recovery, and the main processing loop.
Implementation
def start
super
# Start the delayed processor, which will move jobs to the ready processor when they are ready:
@delayed_jobs.start(@ready_list, resolution: @resolution)
# Start the processing processor, which will move jobs to the ready processor when they are abandoned:
@processing_list.start
self.start!
end
def stop
Stop the server and all background processing tasks.
Implementation
def stop
@task&.stop
super
end
def status_string
Generates a human-readable string representing the current statistics.
e.g. `R=3.42K D=1.23K P=7/2.34K``
This can be interpreted as:
- R: Number of jobs in the ready list
- D: Number of jobs in the delayed queue
- P: Number of jobs currently being processed / total number of completed jobs.
Signature
-
returns
String
A string representing the current statistics.
Implementation
def status_string
"R=#{format_count(@ready_list.size)} D=#{format_count(@delayed_jobs.size)} P=#{format_count(@processing_list.size)}/#{format_count(@processing_list.complete_count)}"
end
def call(job)
Submit a new job for processing. Jobs with a scheduled_at time are queued for delayed processing, while immediate jobs are added to the ready queue.
Signature
-
parameter
job
Hash
The job data to process.
Implementation
def call(job)
scheduled_at = Coder::Time(job["scheduled_at"])
if scheduled_at
@delayed_jobs.add(@coder.dump(job), scheduled_at, @job_store)
else
@ready_list.add(@coder.dump(job), @job_store)
end
end
def dequeue(parent)
Dequeue a job from the ready list and process it.
If the job fails for any reason, it will be retried.
If you do not desire this behavior, you should catch exceptions in the delegate.
Implementation
def dequeue(parent)
_id = @processing_list.fetch
parent.async do
id = _id; _id = nil
job = @coder.load(@job_store.get(id))
@delegate.call(job)
@processing_list.complete(id)
rescue => error
Console.error(self, "Job failed with error!", id: id, exception: error)
@processing_list.retry(id)
end
ensure
@processing_list.retry(_id) if _id
end