AsyncSourceAsyncBarrier

class Barrier

A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore.

Example

require 'async'
require 'async/barrier'

barrier = Async::Barrier.new
Sync do
	Console.info("Barrier Example: sleep sort.")
	
	# Generate an array of 10 numbers:
	numbers = 10.times.map{rand(10)}
	sorted = []
	
	# Sleep sort the numbers:
	numbers.each do |number|
		barrier.async do |task|
			sleep(number)
			sorted << number
		end
	end
	
	# Wait for all the numbers to be sorted:
	barrier.wait
	
	Console.info("Sorted", sorted)
ensure
	# Ensure all the tasks are stopped when we exit:
	barrier.stop
end

Output

0.0s     info: Barrier Example: sleep sort.
9.0s     info: Sorted
             | [3, 3, 3, 4, 4, 5, 5, 5, 8, 9]

Signature

public

Since Async v1.

Nested

Definitions

def initialize(parent: nil)

  • public

Initialize the barrier.

Signature

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

public

Since Async v1.

Implementation

def initialize(parent: nil)
	@tasks = List.new
	@finished = Queue.new
	
	@parent = parent
end

def size

Number of tasks being held by the barrier.

Implementation

def size
	@tasks.size
end

attr :tasks

All tasks which have been invoked into the barrier.

def async(*arguments, parent: (@parent or Task.current), **options, &block)

  • asynchronous

Execute a child task and add it to the barrier.

Signature

asynchronous

Executes the given block concurrently.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	waiting = nil
	
	parent.async(*arguments, **options) do |task, *arguments|
		waiting = TaskNode.new(task)
		@tasks.append(waiting)
		block.call(task, *arguments)
	ensure
		@finished.signal(waiting)
	end
end

def empty?

Whether there are any tasks being held by the barrier.

Signature

returns Boolean

Implementation

def empty?
	@tasks.empty?
end

def wait

  • asynchronous

Wait for all tasks to complete by invoking Async::Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.

Signature

yields {|task| ...}

If a block is given, the unwaited task is yielded. You must invoke Async::Task#wait yourself. In addition, you may break if you have captured enough results.

asynchronous

Will wait for tasks to finish executing.

Implementation

def wait
	while !@tasks.empty?
		# Wait for a task to finish (we get the task node):
		return unless waiting = @finished.wait
		
		# Remove the task as it is now finishing:
		@tasks.remove?(waiting)
		
		# Get the task:
		task = waiting.task
		
		# If a block is given, the user can implement their own behaviour:
		if block_given?
			yield task
		else
			# Wait for it to either complete or raise an error:
			task.wait
		end
	end
end

def stop

  • asynchronous

Stop all tasks held by the barrier.

Signature

asynchronous

May wait for tasks to finish executing.

Implementation

def stop
	@tasks.each do |waiting|
		waiting.task.stop
	end
	
	@finished.close
end

Discussion