Async::Job::Processor::RedisSourceAsyncJobProcessorRedisServer

class Server

Redis-backed job processor server. Manages job queues using Redis for distributed job processing across multiple workers. Handles immediate jobs, delayed jobs, and job retry/recovery mechanisms.

Definitions

def initialize(delegate, client, prefix: "async-job", coder: Coder::DEFAULT, resolution: 10, parent: nil)

Initialize a new Redis job processor server.

Signature

parameter delegate Object

The delegate object that will process jobs.

parameter client Async::Redis::Client

The Redis client instance.

parameter prefix String

The Redis key prefix for job data.

parameter coder Async::Job::Coder

The job serialization codec.

parameter resolution Integer

The resolution in seconds for delayed job processing.

parameter parent Async::Task

The parent task for background processing.

Implementation

def initialize(delegate, client, prefix: "async-job", coder: Coder::DEFAULT, resolution: 10, parent: nil)
	super(delegate)
	
	@id = SecureRandom.uuid
	@client = client
	@prefix = prefix
	@coder = coder
	@resolution = resolution
	
	@job_store = JobStore.new(@client, "#{@prefix}:jobs")
	@delayed_jobs = DelayedJobs.new(@client, "#{@prefix}:delayed")
	@ready_list = ReadyList.new(@client, "#{@prefix}:ready")
	@processing_list = ProcessingList.new(@client, "#{@prefix}:processing", @id, @ready_list, @job_store)
	
	@parent = parent || Async::Idler.new
end

def start!

Start the job processing loop immediately.

Signature

returns Async::Task | false

The processing task or false if already started.

Implementation

def start!
	return false if @task
	
	@task = true
	
	@parent.async(transient: true, annotation: self.class.name) do |task|
		@task = task
		
		while true
			self.dequeue(task)
		end
	ensure
		@task = nil
	end
end

def start

Start the server and all background processing tasks. Initializes delayed job processing, abandoned job recovery, and the main processing loop.

Implementation

def start
	super
	
	# Start the delayed processor, which will move jobs to the ready processor when they are ready:
	@delayed_jobs.start(@ready_list, resolution: @resolution)
	
	# Start the processing processor, which will move jobs to the ready processor when they are abandoned:
	@processing_list.start
	
	self.start!
end

def stop

Stop the server and all background processing tasks.

Implementation

def stop
	@task&.stop
	
	super
end

def status_string

Generates a human-readable string representing the current statistics.

e.g. `R=3.42K D=1.23K P=7/2.34K``

This can be interpreted as:

  • R: Number of jobs in the ready list
  • D: Number of jobs in the delayed queue
  • P: Number of jobs currently being processed / total number of completed jobs.

Signature

returns String

A string representing the current statistics.

Implementation

def status_string
	"R=#{format_count(@ready_list.size)} D=#{format_count(@delayed_jobs.size)} P=#{format_count(@processing_list.size)}/#{format_count(@processing_list.complete_count)}"
end

def call(job)

Submit a new job for processing. Jobs with a scheduled_at time are queued for delayed processing, while immediate jobs are added to the ready queue.

Signature

parameter job Hash

The job data to process.

Implementation

def call(job)
	scheduled_at = Coder::Time(job["scheduled_at"])
	
	if scheduled_at
		@delayed_jobs.add(@coder.dump(job), scheduled_at, @job_store)
	else
		@ready_list.add(@coder.dump(job), @job_store)
	end
end

def dequeue(parent)

Dequeue a job from the ready list and process it.

If the job fails for any reason, it will be retried.

If you do not desire this behavior, you should catch exceptions in the delegate.

Implementation

def dequeue(parent)
	_id = @processing_list.fetch
	
	parent.async do
		id = _id; _id = nil
		
		job = @coder.load(@job_store.get(id))
		@delegate.call(job)
		@processing_list.complete(id)
	rescue => error
		Console.error(self, "Job failed with error!", id: id, exception: error)
		@processing_list.retry(id)
	end
ensure
	@processing_list.retry(_id) if _id
end