class Barrier
A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore
.
Example
require 'async'
require 'async/barrier'
barrier = Async::Barrier.new
Sync do
Console.info("Barrier Example: sleep sort.")
# Generate an array of 10 numbers:
numbers = 10.times.map{rand(10)}
sorted = []
# Sleep sort the numbers:
numbers.each do |number|
barrier.async do |task|
task.sleep(number)
sorted << number
end
end
# Wait for all the numbers to be sorted:
barrier.wait
Console.info("Sorted", sorted)
ensure
# Ensure all the tasks are stopped when we exit:
barrier.stop
end
Output
0.0s info: Barrier Example: sleep sort.
9.0s info: Sorted
| [3, 3, 3, 4, 4, 5, 5, 5, 8, 9]
Signature
- public
Since Async v1.
Nested
Definitions
def initialize(parent: nil)
- public
Initialize the barrier.
Signature
-
parameter
parent
Task | Semaphore | Nil
The parent for holding any children tasks.
- public
Since Async v1.
Implementation
def initialize(parent: nil)
@tasks = List.new
@parent = parent
end
def size
Number of tasks being held by the barrier.
Implementation
def size
@tasks.size
end
attr :tasks
All tasks which have been invoked into the barrier.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
- asynchronous
Execute a child task and add it to the barrier.
Signature
- asynchronous
Executes the given block concurrently.
Implementation
def async(*arguments, parent: (@parent or Task.current), **options, &block)
task = parent.async(*arguments, **options, &block)
@tasks.append(TaskNode.new(task))
return task
end
def empty?
Whether there are any tasks being held by the barrier.
Signature
-
returns
Boolean
Implementation
def empty?
@tasks.empty?
end
def wait
- asynchronous
Wait for all tasks to complete by invoking Async::Task#wait
on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
Signature
- asynchronous
Will wait for tasks to finish executing.
Implementation
def wait
@tasks.each do |waiting|
task = waiting.task
begin
task.wait
ensure
@tasks.remove?(waiting) unless task.alive?
end
end
end
def stop
- asynchronous
Stop all tasks held by the barrier.
Signature
- asynchronous
May wait for tasks to finish executing.
Implementation
def stop
@tasks.each do |waiting|
waiting.task.stop
end
end