ProjectSource

Async::Barrier

A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore.

Example

require 'async'
require 'async/barrier'

Sync do
	barrier = Async::Barrier.new
	
	# Generate an array of 10 numbers:
	numbers = 10.times.map{rand(10)}
	sorted = []
	
	# Sleep sort the numbers:
	numbers.each do |number|
		barrier.async do |task|
			task.sleep(number)
			sorted << number
		end
	end
	
	# Wait for all the numbers to be sorted:
	barrier.wait
	
	Console.logger.info("Sorted", sorted)
end

Output

0.0s     info: Sorted [ec=0x104] [pid=50291]
						 | [0, 0, 0, 0, 1, 2, 2, 3, 6, 6]

Signature

public

Since stable-v1.

Definitions

def initialize(parent: nil)

  • public

Initialize the barrier.

Signature

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

public

Since stable-v1.

Implementation

def initialize(parent: nil)
	@tasks = []
	
	@parent = parent
end

attr :tasks

All tasks which have been invoked into the barrier.

def size

The number of tasks currently held by the barrier.

Implementation

def size
	@tasks.size
end

def async(*arguments, parent: (@parent or Task.current), **options, &block)

  • asynchronous

Execute a child task and add it to the barrier.

Signature

asynchronous

Executes the given block concurrently.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	task = parent.async(*arguments, **options, &block)
	
	@tasks << task
	
	return task
end

def empty?

Whether there are any tasks being held by the barrier.

Signature

returns Boolean

Implementation

def empty?
	@tasks.empty?
end

def wait

  • asynchronous

Wait for all tasks.

Signature

asynchronous

Will wait for tasks to finish executing.

Implementation

def wait
	# TODO: This would be better with linked list.
	while @tasks.any?
		task = @tasks.first
		
		begin
			task.wait
		ensure
			# We don't know for sure that the exception was due to the task completion.
			unless task.running?
				# Remove the task from the waiting list if it's finished:
				@tasks.shift if @tasks.first == task
			end
		end
	end
end

def stop

  • asynchronous

Stop all tasks held by the barrier.

Signature

asynchronous

May wait for tasks to finish executing.

Implementation

def stop
	# We have to be careful to avoid enumerating tasks while adding/removing to it:
	tasks = @tasks.dup
	tasks.each(&:stop)
end

Discussion