Best Practices
This guide gives an overview of best practices for using Async.
Use a top-level Sync
to denote the root of your program
Async{}
has two uses: it creates an event loop if one doesn't exist, and it creates a task which runs asynchronously with respect to the parent scope. However, the top level Async{}
block will be synchronous because it creates the event loop. In some programs, you do not care about executing asynchronously, but you still want your code to run in an event loop. Sync{}
exists to do this efficiently.
require 'async'
class Packages
def initialize(urls)
@urls = urls
end
def fetch
# A common use case is to make functions which appear synchronous, but internally use asynchronous execution:
Sync do |task|
@urls.map do |url|
task.async do
fetch(url)
end
end.map(&:wait)
end
end
end
Sync{...}
is semantically equivalent to Async{}.wait
, but it is more efficient. It is the preferred way to run code in an event loop at the top level of your program or to ensure some code runs in an event loop without creating a new task. The name Sync
means "Synchronous Async", indicating that it runs synchronously with respect to the outer scope, but still allows for asynchronous execution within it.
Current Task
In some scenarios, it can be invalid to call a method outside of an event loop, for example a top level Async{...}
can block forever, which might be unexpected.
def wait(queue)
Async do
queue.pop
end
end
You can force callers of a method to only call the method within an asynchronous context by using a keyword argument parent: Async::Task.current
. If no task is present, this will raise an exception.
def wait(queue, parent: Async::Task.current)
parent.async do
queue.pop
end
end
This expresses the intent to the caller that this method should only be invoked from within an asynchonous task. In addition, it allows the caller to substitute other parent objects, like semaphores or barriers, which can be useful for managing concurrency.
Use barriers to manage unbounded concurrency
Barriers provide a way to manage an unbounded number of tasks. The top-level Barrier
method creates a barrier with built-in load management using an Async::Idler
.
Barrier do |barrier|
items.each do |item|
barrier.async do
process(item)
end
end
end
The barrier will automatically wait for all tasks to complete and stop any outstanding tasks when the block exits. By default, it uses an Async::Idler
to prevent system overload by scheduling tasks when the system load is below 80%.
If you want to process tasks in order of completion, you can explicitly call wait
with a block:
Barrier do |barrier|
items.each do |item|
barrier.async do
process(item)
end
end
# Process the tasks in order of completion:
barrier.wait do |task|
result = task.wait
# Do something with result.
# If you don't want to wait for any more tasks you can break:
break
end
end
To disable load management (not recommended for unbounded concurrency), you can pass parent: nil
:
Barrier(parent: nil) do |barrier|
# No load management - creates tasks as fast as possible
items.each do |item|
barrier.async do
process(item)
end
end
end
Use a semaphore to limit the number of concurrent tasks
Semaphores allow you to limit the level of concurrency to a fixed number of tasks. When using semaphores with barriers, the barrier should be the root of your task hierarchy, and the semaphore should be a child of the barrier:
Barrier(parent: nil) do |barrier|
semaphore = Async::Semaphore.new(4, parent: barrier)
items.each do |item|
semaphore.async do
process(item)
end
end
end
In this example, we use parent: nil
for the barrier to disable load management, since the semaphore already provides concurrency control. The semaphore limits execution to 4 concurrent tasks, and the barrier ensures all tasks are stopped when the block exits.
Idler
Idlers are like semaphores but with a limit defined by current processor utilization. In other words, an idler will schedule work up to a specific ratio of idle/busy time in the scheduler.
The top-level Barrier
method uses an idler by default, making it safe for unbounded concurrency:
Barrier do |barrier| # Uses Async::Idler.new(0.8) by default
work.each do |work|
barrier.async do
work.call
end
end
end
You can also use an idler directly without a barrier:
Async do
# Create an idler that will aim for a load average of 80%:
idler = Async::Idler.new(0.8)
# Some list of work to be done:
work.each do |work|
idler.async do
# Do the work:
work.call
end
end
end
The idler will try to schedule as much work such that the load of the scheduler stays at around 80% saturation.
Use timeouts for operations that might block forever
General timeouts can be imposed by using task.with_timeout(duration)
.
Async do |task|
# This will raise an Async::TimeoutError after 1 second:
task.with_timeout(1) do |timeout|
# Timeout#duration= can be used to adjust the duration of the timeout.
# Timeout#cancel can be used to cancel the timeout completely.
sleep 10
end
end
It can be especially important to impose timeouts when processing user-provided data.