Async::Job::Processor::RedisSourceAsyncJobProcessorRedisDelayedJobs

class DelayedJobs

Manages delayed job scheduling using Redis sorted sets. Jobs are stored with their execution timestamps and automatically moved to the ready queue when their scheduled time arrives.

Definitions

def initialize(client, key)

Initialize a new delayed jobs manager.

Signature

parameter client Async::Redis::Client

The Redis client instance.

parameter key String

The Redis key for the delayed jobs sorted set.

Implementation

def initialize(client, key)
	@client = client
	@key = key
	
	@add = @client.script(:load, ADD)
	@move = @client.script(:load, MOVE)
end

def size

Signature

returns Integer

The number of jobs currently in the delayed queue.

Implementation

def size
	@client.zcard(@key)
end

def start(ready_list, resolution: 10, parent: Async::Task.current)

Start the background task that moves ready delayed jobs to the ready queue.

Signature

parameter ready_list ReadyList

The ready list to move jobs to.

parameter resolution Integer

The check interval in seconds.

parameter parent Async::Task

The parent task to run the background loop in.

returns Async::Task

The background processing task.

Implementation

def start(ready_list, resolution: 10, parent: Async::Task.current)
	parent.async do
		while true
			count = move(destination: ready_list.key)
			
			if count > 0
				Console.debug(self, "Moved #{count} delayed jobs to ready list.")
			end
			
			sleep(resolution)
		end
	end
end

attr :key

Signature

attribute String

The Redis key for this delayed jobs queue.

def add(job, timestamp, job_store)

Add a job to the delayed queue with a specified execution time.

Signature

parameter job String

The serialized job data.

parameter timestamp Time

When the job should be executed.

parameter job_store JobStore

The job store to save the job data.

returns String

The unique job ID.

Implementation

def add(job, timestamp, job_store)
	id = SecureRandom.uuid
	
	@client.evalsha(@add, 2, job_store.key, @key, id, job, timestamp.to_f)
	
	return id
end

def move(destination:, now: Time.now.to_f)

Move jobs that are ready to be processed from the delayed queue to the destination.

Signature

parameter destination String

The Redis key of the destination queue.

parameter now Integer

The current timestamp to check against.

returns Integer

The number of jobs moved.

Implementation

def move(destination:, now: Time.now.to_f)
	@client.evalsha(@move, 2, @key, destination, now)
end