Async::Barrier
A synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with class Async::Semaphore
.
Example
require 'async'
require 'async/barrier'
Sync do
barrier = Async::Barrier.new
# Generate an array of 10 numbers:
numbers = 10.times.map{rand(10)}
sorted = []
# Sleep sort the numbers:
numbers.each do |number|
barrier.async do |task|
task.sleep(number)
sorted << number
end
end
# Wait for all the numbers to be sorted:
barrier.wait
Console.logger.info("Sorted", sorted)
end
Output
0.0s info: Sorted [ec=0x104] [pid=50291]
| [0, 0, 0, 0, 1, 2, 2, 3, 6, 6]
Signature
- public
Since
stable-v1
.
Definitions
def initialize(parent: nil)
- public
Initialize the barrier.
Signature
-
parameter
parent
Task | Semaphore | Nil
The parent for holding any children tasks.
- public
Since
stable-v1
.
Implementation
def initialize(parent: nil)
@tasks = []
@parent = parent
end
attr :tasks
All tasks which have been invoked into the barrier.
def size
The number of tasks currently held by the barrier.
Implementation
def size
@tasks.size
end
def async(*arguments, parent: (@parent or Task.current), **options, &block)
- asynchronous
Execute a child task and add it to the barrier.
Signature
- asynchronous
Executes the given block concurrently.
Implementation
def async(*arguments, parent: (@parent or Task.current), **options, &block)
task = parent.async(*arguments, **options, &block)
@tasks << task
return task
end
def empty?
Whether there are any tasks being held by the barrier.
Signature
-
returns
Boolean
Implementation
def empty?
@tasks.empty?
end
def wait
- asynchronous
Wait for all tasks.
Signature
- asynchronous
Will wait for tasks to finish executing.
Implementation
def wait
# TODO: This would be better with linked list.
while @tasks.any?
task = @tasks.first
begin
task.wait
ensure
# We don't know for sure that the exception was due to the task completion.
unless task.running?
# Remove the task from the waiting list if it's finished:
@tasks.shift if @tasks.first == task
end
end
end
end
def stop
- asynchronous
Stop all tasks held by the barrier.
Signature
- asynchronous
May wait for tasks to finish executing.
Implementation
def stop
# We have to be careful to avoid enumerating tasks while adding/removing to it:
tasks = @tasks.dup
tasks.each(&:stop)
end