Async::Job::Processor::RedisSourceAsyncJobProcessorRedisProcessingList

class ProcessingList

Manages jobs currently being processed and handles abandoned job recovery. Maintains heartbeats for active workers and automatically requeues jobs from workers that have stopped responding.

Definitions

def initialize(client, key, id, ready_list, job_store)

Initialize a new processing list manager.

Signature

parameter client Async::Redis::Client

The Redis client instance.

parameter key String

The base Redis key for processing data.

parameter id String

The unique server/worker ID.

parameter ready_list ReadyList

The ready job queue.

parameter job_store JobStore

The job data store.

Implementation

def initialize(client, key, id, ready_list, job_store)
	@client = client
	@key = key
	@id = id
	
	@ready_list = ready_list
	@job_store = job_store
	
	@pending_key = "#{@key}:#{@id}:pending"
	@heartbeat_key = "#{@key}:#{@id}"
	
	@requeue = @client.script(:load, REQUEUE)
	@retry = @client.script(:load, RETRY)
	@complete = @client.script(:load, COMPLETE)
	
	@complete_count = 0
end

attr :key

Signature

attribute String

The base Redis key for this processing list.

attr :heartbeat_key

Signature

attribute String

The Redis key for this worker's heartbeat.

attr :complete_count

Signature

attribute Integer

The total count of all jobs completed by this worker.

def size

Signature

returns Integer

The number of jobs currently being processed by this worker.

Implementation

def size
	@client.llen(@pending_key)
end

def fetch

Fetch the next job from the ready queue, moving it to this worker's pending list. This is a blocking operation that waits until a job is available.

Signature

returns String, nil

The job ID, or nil if no job is available.

Implementation

def fetch
	@client.brpoplpush(@ready_list.key, @pending_key, 0)
end

def complete(id)

Mark a job as completed, removing it from the pending list and job store.

Signature

parameter id String

The job ID to complete.

Implementation

def complete(id)
	@complete_count += 1
	
	@client.evalsha(@complete, 2, @pending_key, @job_store.key, id)
end

def retry(id)

Retry a failed job by moving it back to the ready queue.

Signature

parameter id String

The job ID to retry.

Implementation

def retry(id)
	Console.warn(self, "Retrying job: #{id}")
	@client.evalsha(@retry, 2, @pending_key, @ready_list.key, id)
end

def requeue(start_time, delay, factor)

Update heartbeat and requeue any abandoned jobs from inactive workers.

Signature

parameter start_time Float

The start time for calculating uptime.

parameter delay Numeric

The heartbeat update interval.

parameter factor Numeric

The heartbeat expiration factor.

returns Integer

The number of jobs requeued from abandoned workers.

Implementation

def requeue(start_time, delay, factor)
	uptime = (Time.now.to_f - start_time).round(2)
	expiry = (delay*factor).ceil
	@client.set(@heartbeat_key, JSON.dump(uptime: uptime), seconds: expiry)
	
	# Requeue any jobs that have been abandoned:
	count = @client.evalsha(@requeue, 2, @key, @ready_list.key)
	
	return count
end

def start(delay: 5, factor: 2, parent: Async::Task.current)

Start the background heartbeat and abandoned job recovery task.

Signature

parameter delay Integer

The heartbeat update interval in seconds.

parameter factor Integer

The heartbeat expiration factor.

parameter parent Async::Task

The parent task to run the background loop in.

returns Async::Task

The background processing task.

Implementation

def start(delay: 5, factor: 2, parent: Async::Task.current)
	start_time = Time.now.to_f
	
	parent.async do |task|
		while true
			task.defer_stop do
				count = self.requeue(start_time, delay, factor)
				
				if count > 0
					Console.warn(self, "Requeued #{count} abandoned jobs.")
				end
			end
			
			sleep(delay)
		end
	end
end