AsyncSourceAsyncWaiter

class Waiter

A synchronization primitive, which allows you to wait for tasks to complete in order of completion. This is useful for implementing a task pool, where you want to wait for the first task to complete, and then cancel the rest.

If you try to wait for more things than you have added, you will deadlock.

Example

require 'async'
require 'async/semaphore'
require 'async/barrier'
require 'async/waiter'

Sync do
	barrier = Async::Barrier.new
	waiter = Async::Waiter.new(parent: barrier)
	semaphore = Async::Semaphore.new(2, parent: waiter)
	
	# Sleep sort the numbers:
	generator = Async do
		while true
			semaphore.async do |task|
				number = rand(1..10)
				sleep(number)
			end
		end
	end
	
	numbers = []
	
	4.times do
		# Wait for all the numbers to be sorted:
		numbers << waiter.wait
	end
	
	# Don't generate any more numbers:
	generator.stop
	
	# Stop all tasks which we don't care about:
	barrier.stop
	
	Console.info("Smallest", numbers)
end

Output

0.0s     info: Smallest
             | [3, 3, 1, 2]

Definitions

def initialize(parent: nil, finished: Async::Condition.new)

Create a waiter instance.

Signature

parameter parent Interface(:async) | Nil

The parent task to use for asynchronous operations.

parameter finished Async::Condition

The condition to signal when a task completes.

Implementation

def initialize(parent: nil, finished: Async::Condition.new)
	@finished = finished
	@done = []
	
	@parent = parent
end

def async(parent: (@parent or Task.current), **options, &block)

  • asynchronous

Execute a child task and add it to the waiter.

Signature

asynchronous

Executes the given block concurrently.

Implementation

def async(parent: (@parent or Task.current), **options, &block)
	parent.async(**options) do |task|
		yield(task)
	ensure
		@done << task
		@finished.signal
	end
end

def first(count = nil)

Wait for the first count tasks to complete.

Signature

parameter count Integer | Nil

The number of tasks to wait for.

returns Array(Async::Task)

If an integer is given, the tasks which have completed.

returns Async::Task

Otherwise, the first task to complete.

Implementation

def first(count = nil)
	minimum = count || 1
	
	while @done.size < minimum
		@finished.wait
	end
	
	return @done.shift(*count)
end

def wait(count = nil)

Wait for the first count tasks to complete.

Signature

parameter count Integer | Nil

The number of tasks to wait for.

Implementation

def wait(count = nil)
	if count
		first(count).map(&:wait)
	else
		first.wait
	end
end

Discussion