class DelayedJobs
Manages delayed job scheduling using Redis sorted sets. Jobs are stored with their execution timestamps and automatically moved to the ready queue when their scheduled time arrives.
Definitions
def initialize(client, key)
Initialize a new delayed jobs manager.
Signature
-
parameter
client
Async::Redis::Client
The Redis client instance.
-
parameter
key
String
The Redis key for the delayed jobs sorted set.
Implementation
def initialize(client, key)
@client = client
@key = key
@add = @client.script(:load, ADD)
@move = @client.script(:load, MOVE)
end
def size
Signature
-
returns
Integer
The number of jobs currently in the delayed queue.
Implementation
def size
@client.zcard(@key)
end
def start(ready_list, resolution: 10, parent: Async::Task.current)
Start the background task that moves ready delayed jobs to the ready queue.
Signature
-
parameter
ready_list
ReadyList
The ready list to move jobs to.
-
parameter
resolution
Integer
The check interval in seconds.
-
parameter
parent
Async::Task
The parent task to run the background loop in.
-
returns
Async::Task
The background processing task.
Implementation
def start(ready_list, resolution: 10, parent: Async::Task.current)
parent.async do
while true
count = move(destination: ready_list.key)
if count > 0
Console.debug(self, "Moved #{count} delayed jobs to ready list.")
end
sleep(resolution)
end
end
end
attr :key
Signature
-
attribute
String
The Redis key for this delayed jobs queue.
def add(job, timestamp, job_store)
Add a job to the delayed queue with a specified execution time.
Signature
-
parameter
job
String
The serialized job data.
-
parameter
timestamp
Time
When the job should be executed.
-
parameter
job_store
JobStore
The job store to save the job data.
-
returns
String
The unique job ID.
Implementation
def add(job, timestamp, job_store)
id = SecureRandom.uuid
@client.evalsha(@add, 2, job_store.key, @key, id, job, timestamp.to_f)
return id
end
def move(destination:, now: Time.now.to_f)
Move jobs that are ready to be processed from the delayed queue to the destination.
Signature
-
parameter
destination
String
The Redis key of the destination queue.
-
parameter
now
Integer
The current timestamp to check against.
-
returns
Integer
The number of jobs moved.
Implementation
def move(destination:, now: Time.now.to_f)
@client.evalsha(@move, 2, @key, destination, now)
end