class DelayedJobs
Manages delayed job scheduling using Redis sorted sets. Jobs are stored with their execution timestamps and automatically moved to the ready queue when their scheduled time arrives.
Definitions
def initialize(client, key)
Initialize a new delayed jobs manager.
Signature
-
parameter
clientAsync::Redis::Client The Redis client instance.
-
parameter
keyString The Redis key for the delayed jobs sorted set.
Implementation
def initialize(client, key)
@client = client
@key = key
@add = @client.script(:load, ADD)
@move = @client.script(:load, MOVE)
end
def size
Signature
-
returns
Integer The number of jobs currently in the delayed queue.
Implementation
def size
@client.zcard(@key)
end
def start(ready_list, resolution: 10, parent: Async::Task.current)
Start the background task that moves ready delayed jobs to the ready queue.
Signature
-
parameter
ready_listReadyList The ready list to move jobs to.
-
parameter
resolutionInteger The check interval in seconds.
-
parameter
parentAsync::Task The parent task to run the background loop in.
-
returns
Async::Task The background processing task.
Implementation
def start(ready_list, resolution: 10, parent: Async::Task.current)
parent.async do
while true
count = move(destination: ready_list.key)
if count > 0
Console.debug(self, "Moved #{count} delayed jobs to ready list.")
end
sleep(resolution)
end
end
end
attr :key
Signature
-
attribute
String The Redis key for this delayed jobs queue.
def add(job, timestamp, job_store)
Add a job to the delayed queue with a specified execution time.
Signature
-
parameter
jobString The serialized job data.
-
parameter
timestampTime When the job should be executed.
-
parameter
job_storeJobStore The job store to save the job data.
-
returns
String The unique job ID.
Implementation
def add(job, timestamp, job_store)
id = SecureRandom.uuid
@client.evalsha(@add, 2, job_store.key, @key, id, job, timestamp.to_f)
return id
end
def move(destination:, now: Time.now.to_f)
Move jobs that are ready to be processed from the delayed queue to the destination.
Signature
-
parameter
destinationString The Redis key of the destination queue.
-
parameter
nowInteger The current timestamp to check against.
-
returns
Integer The number of jobs moved.
Implementation
def move(destination:, now: Time.now.to_f)
@client.evalsha(@move, 2, @key, destination, now)
end