AsyncSourceAsyncBarrier

class Barrier

A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore.

Example

require 'async'
require 'async/barrier'

barrier = Async::Barrier.new
Sync do
	Console.info("Barrier Example: sleep sort.")
	
	# Generate an array of 10 numbers:
	numbers = 10.times.map{rand(10)}
	sorted = []
	
	# Sleep sort the numbers:
	numbers.each do |number|
		barrier.async do |task|
			sleep(number)
			sorted << number
		end
	end
	
	# Wait for all the numbers to be sorted:
	barrier.wait
	
	Console.info("Sorted", sorted)
ensure
	# Ensure all the tasks are stopped when we exit:
	barrier.stop
end

Output

0.0s     info: Barrier Example: sleep sort.
9.0s     info: Sorted
             | [3, 3, 3, 4, 4, 5, 5, 5, 8, 9]

Signature

public

Since Async v1.

Nested

Definitions

def initialize(parent: nil)

  • public

Initialize the barrier.

Signature

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

public

Since Async v1.

Implementation

def initialize(parent: nil)
	@tasks = List.new
	@finished = Queue.new
	@condition = Condition.new
	
	@parent = parent
end

def size

Number of tasks being held by the barrier.

Implementation

def size
	@tasks.size
end

attr :tasks

All tasks which have been invoked into the barrier.

def async(*arguments, parent: (@parent or Task.current), **options, &block)

  • asynchronous

Execute a child task and add it to the barrier.

Signature

asynchronous

Executes the given block concurrently.

returns Task

The task which was created to execute the block.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options, &block)
	raise "Barrier is stopped!" if @finished.closed?
	
	waiting = nil
	
	task = parent.async(*arguments, **options) do |task, *arguments|
		# Create a new list node for the task and add it to the list of waiting tasks:
		node = TaskNode.new(task)
		@tasks.append(node)
		
		# Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish:
		waiting = node
		@condition.signal
		
		# Invoke the block, which may raise an error. If it does, we will still signal that the task has finished:
		block.call(task, *arguments)
	ensure
		# Signal that the task has finished, which will unblock the waiting task:
		@finished.signal(node) unless @finished.closed?
	end
	
	# `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it:
	@condition.wait while waiting.nil?
	
	return task
end

def empty?

Whether there are any tasks being held by the barrier.

Signature

returns Boolean

Implementation

def empty?
	@tasks.empty?
end

def wait

  • asynchronous

Wait for all tasks to complete by invoking Async::Task#wait on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.

Signature

yields {|task| ...}

If a block is given, the unwaited task is yielded. You must invoke Async::Task#wait yourself. In addition, you may break if you have captured enough results.

returns Integer | Nil

The number of tasks which were waited for, or nil if there were no tasks to wait for.

asynchronous

Will wait for tasks to finish executing.

Implementation

def wait
	return nil if @tasks.empty?
	count = 0
	
	while true
		# Wait for a task to finish (we get the task node):
		break unless waiting = @finished.wait
		count += 1
		
		# Remove the task as it is now finishing:
		@tasks.remove?(waiting)
		
		# Get the task:
		task = waiting.task
		
		# If a block is given, the user can implement their own behaviour:
		if block_given?
			yield task
		else
			# Wait for it to either complete or raise an error:
			task.wait
		end
		
		break if @tasks.empty?
	end
	
	return count
end

def cancel

  • asynchronous

Cancel all tasks held by the barrier.

Signature

asynchronous

May wait for tasks to finish executing.

Implementation

def cancel
	@tasks.each do |waiting|
		waiting.task.cancel
	end
	
	@finished.close
end

def stop

  • deprecated

Backward compatibility alias for #cancel.

Signature

deprecated

Use #cancel instead.

Implementation

def stop
	cancel
end

Discussion