class ProcessingList
Manages jobs currently being processed and handles abandoned job recovery. Maintains heartbeats for active workers and automatically requeues jobs from workers that have stopped responding.
Definitions
def initialize(client, key, id, ready_list, job_store)
Initialize a new processing list manager.
Signature
-
parameter
client
Async::Redis::Client
The Redis client instance.
-
parameter
key
String
The base Redis key for processing data.
-
parameter
id
String
The unique server/worker ID.
-
parameter
ready_list
ReadyList
The ready job queue.
-
parameter
job_store
JobStore
The job data store.
Implementation
def initialize(client, key, id, ready_list, job_store)
@client = client
@key = key
@id = id
@ready_list = ready_list
@job_store = job_store
@pending_key = "#{@key}:#{@id}:pending"
@heartbeat_key = "#{@key}:#{@id}"
@requeue = @client.script(:load, REQUEUE)
@retry = @client.script(:load, RETRY)
@complete = @client.script(:load, COMPLETE)
@complete_count = 0
end
attr :key
Signature
-
attribute
String
The base Redis key for this processing list.
attr :heartbeat_key
Signature
-
attribute
String
The Redis key for this worker's heartbeat.
attr :complete_count
Signature
-
attribute
Integer
The total count of all jobs completed by this worker.
def size
Signature
-
returns
Integer
The number of jobs currently being processed by this worker.
Implementation
def size
@client.llen(@pending_key)
end
def fetch
Fetch the next job from the ready queue, moving it to this worker's pending list. This is a blocking operation that waits until a job is available.
Signature
-
returns
String, nil
The job ID, or nil if no job is available.
Implementation
def fetch
@client.brpoplpush(@ready_list.key, @pending_key, 0)
end
def complete(id)
Mark a job as completed, removing it from the pending list and job store.
Signature
-
parameter
id
String
The job ID to complete.
Implementation
def complete(id)
@complete_count += 1
@client.evalsha(@complete, 2, @pending_key, @job_store.key, id)
end
def retry(id)
Retry a failed job by moving it back to the ready queue.
Signature
-
parameter
id
String
The job ID to retry.
Implementation
def retry(id)
Console.warn(self, "Retrying job: #{id}")
@client.evalsha(@retry, 2, @pending_key, @ready_list.key, id)
end
def requeue(start_time, delay, factor)
Update heartbeat and requeue any abandoned jobs from inactive workers.
Signature
-
parameter
start_time
Float
The start time for calculating uptime.
-
parameter
delay
Numeric
The heartbeat update interval.
-
parameter
factor
Numeric
The heartbeat expiration factor.
-
returns
Integer
The number of jobs requeued from abandoned workers.
Implementation
def requeue(start_time, delay, factor)
uptime = (Time.now.to_f - start_time).round(2)
expiry = (delay*factor).ceil
@client.set(@heartbeat_key, JSON.dump(uptime: uptime), seconds: expiry)
# Requeue any jobs that have been abandoned:
count = @client.evalsha(@requeue, 2, @key, @ready_list.key)
return count
end
def start(delay: 5, factor: 2, parent: Async::Task.current)
Start the background heartbeat and abandoned job recovery task.
Signature
-
parameter
delay
Integer
The heartbeat update interval in seconds.
-
parameter
factor
Integer
The heartbeat expiration factor.
-
parameter
parent
Async::Task
The parent task to run the background loop in.
-
returns
Async::Task
The background processing task.
Implementation
def start(delay: 5, factor: 2, parent: Async::Task.current)
start_time = Time.now.to_f
parent.async do |task|
while true
task.defer_stop do
count = self.requeue(start_time, delay, factor)
if count > 0
Console.warn(self, "Requeued #{count} abandoned jobs.")
end
end
sleep(delay)
end
end
end