Async::Semaphore
A synchronization primitive, which limits access to a given resource, such as a limited number of database connections, open files, or network connections.
Example
require 'async'
require 'async/semaphore'
require 'net/http'
Sync do
# Only allow two concurrent tasks at a time:
semaphore = Async::Semaphore.new(2)
# Generate an array of 10 numbers:
terms = ['ruby', 'python', 'go', 'java', 'c++']
# Search for the terms:
terms.each do |term|
semaphore.async do |task|
Console.logger.info("Searching for #{term}...")
response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}")
Console.logger.info("Got response #{response.size} bytes.")
end
end
end
Output
0.0s info: Searching for ruby... [ec=0x3c] [pid=50523]
0.04s info: Searching for python... [ec=0x21c] [pid=50523]
1.7s info: Got response 182435 bytes. [ec=0x3c] [pid=50523]
1.71s info: Searching for go... [ec=0x834] [pid=50523]
3.0s info: Got response 204854 bytes. [ec=0x21c] [pid=50523]
3.0s info: Searching for java... [ec=0xf64] [pid=50523]
4.32s info: Got response 103235 bytes. [ec=0x834] [pid=50523]
4.32s info: Searching for c++... [ec=0x12d4] [pid=50523]
4.65s info: Got response 109697 bytes. [ec=0xf64] [pid=50523]
6.64s info: Got response 87249 bytes. [ec=0x12d4] [pid=50523]
Signature
- public
Since
stable-v1
.
Definitions
def initialize(limit = 1, parent: nil)
Signature
-
parameter
limit
Integer
The maximum number of times the semaphore can be acquired before it blocks.
-
parameter
parent
Task | Semaphore | Nil
The parent for holding any children tasks.
Implementation
def initialize(limit = 1, parent: nil)
@count = 0
@limit = limit
@waiting = []
@parent = parent
end
attr :count
The current number of tasks that have acquired the semaphore.
attr :limit
The maximum number of tasks that can acquire the semaphore.
attr :waiting
The tasks waiting on this semaphore.
def empty?
Is the semaphore currently acquired?
Implementation
def empty?
@count.zero?
end
def blocking?
Whether trying to acquire this semaphore would block.
Implementation
def blocking?
@count >= @limit
end
def async(*arguments, parent: (@parent or Task.current), **options)
Run an async task. Will wait until the semaphore is ready until spawning and running the task.
Implementation
def async(*arguments, parent: (@parent or Task.current), **options)
wait
parent.async(**options) do |task|
@count += 1
begin
yield task, *arguments
ensure
self.release
end
end
end
def acquire
Acquire the semaphore, block if we are at the limit. If no block is provided, you must call release manually.
Signature
-
yields
{...}
When the semaphore can be acquired.
Implementation
def acquire
wait
@count += 1
return unless block_given?
begin
return yield
ensure
self.release
end
end
def release
Release the semaphore. Must match up with a corresponding call to acquire
. Will release waiting fibers in FIFO order.
Implementation
def release
@count -= 1
while (@limit - @count) > 0 and fiber = @waiting.shift
if fiber.alive?
Fiber.scheduler.resume(fiber)
end
end
end
def wait
Wait until the semaphore becomes available.
Implementation
def wait
fiber = Fiber.current
if blocking?
@waiting << fiber
Fiber.scheduler.transfer while blocking?
end
rescue Exception
@waiting.delete(fiber)
raise
end