Async LimiterGuidesQueued Limiter

Queued Limiter

This guide explains the class Async::Limiter::Queued class, which provides priority-based task scheduling with optional resource management. Its key feature is priority-based acquisition where higher priority tasks get access first, with optional support for distributing specific resources from a pre-populated queue.

Usage

Use a queue of specific resources that tasks can acquire:

require "async"
require "async/limiter"
require "async/queue"

Async do
	# Pre-populate queue with database connections
	queue = Async::Queue.new
	queue.push("connection_1")
	queue.push("connection_2") 
	queue.push("connection_3")
	
	limiter = Async::Limiter::Queued.new(queue)
	
	5.times do |i|
		limiter.async do |task|
			# Automatically gets an available connection
			limiter.acquire do |connection|
				puts "Task #{i} using #{connection}"
				task.sleep 1
				# Connection automatically returned to queue
			end
		end
	end
end

# Output shows tasks sharing the 3 available connections
# Tasks 3,4 wait for connections to be returned

Manual Resource Management

For fine-grained control over resource lifecycle:

queue = Async::Queue.new
3.times { |i| queue.push("resource_#{i}") }

limiter = Async::Limiter::Queued.new(queue)

# Acquire with automatic return
limiter.acquire do |resource|
	puts "Using #{resource}"
	# Resource automatically returned when block exits
end

# Manual acquire/release pattern
resource = limiter.acquire
begin
	puts "Using #{resource}"
	# Do work with resource
ensure
	limiter.release(resource)
end

Priority-Based Resource Allocation

Tasks with higher priority values get resources first:

Async do
	queue = Async::PriorityQueue.new
	limiter = Async::Limiter::Queued.new(queue)
	results = []

	# Start tasks with different priorities
	tasks = [
		Async do
			result = limiter.acquire(priority: 1, timeout: 1.0) do |worker|
				"Low priority task used #{worker}"
			end
			results << result
		end,
		
		Async do
			result = limiter.acquire(priority: 10, timeout: 1.0) do |worker|
				"High priority task used #{worker}"
			end
			results << result
		end,
		
		Async do
			result = limiter.acquire(priority: 5, timeout: 1.0) do |worker|
				"Medium priority task used #{worker}"
			end
			results << result
		end
	]
	
	# Add some "workers":
	2.times do |i|
		limiter.release("worker_#{i}")
	end

	tasks.each(&:wait)

	puts results
	# High priority task gets resource first, then medium, then low.
end