Async SourceAsyncBarrier

class Barrier

A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore.

Example

require 'async'
require 'async/barrier'

barrier = Async::Barrier.new
Sync do
	Console.logger.info("Barrier Example: sleep sort.")
	
	# Generate an array of 10 numbers:
	numbers = 10.times.map{rand(10)}
	sorted = []
	
	# Sleep sort the numbers:
	numbers.each do |number|
		barrier.async do |task|
			task.sleep(number)
			sorted << number
		end
	end
	
	# Wait for all the numbers to be sorted:
	barrier.wait
	
	Console.logger.info("Sorted", sorted)
ensure
	# Ensure all the tasks are stopped when we exit:
	barrier.stop
end

Output

0.0s     info: Barrier Example: sleep sort.
9.0s     info: Sorted
             | [3, 3, 3, 4, 4, 5, 5, 5, 8, 9]

Signature

public

Since stable-v1.

Nested

Definitions

def initialize(parent: nil)

  • public

Initialize the barrier.

Signature

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

public

Since stable-v1.

Implementation

def initialize(parent: nil)
	@tasks = List.new
	
	@parent = parent
end

def size

Number of tasks being held by the barrier.

Implementation

def size
	@tasks.size
end

attr :tasks

All tasks which have been invoked into the barrier.

def async(*arguments, parent: (@parent or Task.current), **options, &block)

  • asynchronous

Execute a child task and add it to the barrier.

Signature

asynchronous

Executes the given block concurrently.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	task = parent.async(*arguments, **options, &block)
	
	@tasks.append(TaskNode.new(task))
	
	return task
end

def empty?

Whether there are any tasks being held by the barrier.

Signature

returns Boolean

Implementation

def empty?
	@tasks.empty?
end

def wait

  • asynchronous

Wait for all tasks to complete by invoking Async::Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.

Signature

asynchronous

Will wait for tasks to finish executing.

Implementation

def wait
	@tasks.each do |waiting|
		task = waiting.task
		begin
			task.wait
		ensure
			@tasks.remove?(waiting) unless task.alive?
		end
	end
end

def stop

  • asynchronous

Stop all tasks held by the barrier.

Signature

asynchronous

May wait for tasks to finish executing.

Implementation

def stop
	@tasks.each do |waiting|
		waiting.task.stop
	end
end

Discussion