class Semaphore
A synchronization primitive, which limits access to a given resource, such as a limited number of database connections, open files, or network connections.
Example
require 'async'
require 'async/semaphore'
require 'net/http'
Sync do
# Only allow two concurrent tasks at a time:
semaphore = Async::Semaphore.new(2)
# Generate an array of 10 numbers:
terms = ['ruby', 'python', 'go', 'java', 'c++']
# Search for the terms:
terms.each do |term|
semaphore.async do |task|
Console.info("Searching for #{term}...")
response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}")
Console.info("Got response #{response.size} bytes.")
end
end
end
Output
0.0s info: Searching for ruby... [ec=0x3c] [pid=50523]
0.04s info: Searching for python... [ec=0x21c] [pid=50523]
1.7s info: Got response 182435 bytes. [ec=0x3c] [pid=50523]
1.71s info: Searching for go... [ec=0x834] [pid=50523]
3.0s info: Got response 204854 bytes. [ec=0x21c] [pid=50523]
3.0s info: Searching for java... [ec=0xf64] [pid=50523]
4.32s info: Got response 103235 bytes. [ec=0x834] [pid=50523]
4.32s info: Searching for c++... [ec=0x12d4] [pid=50523]
4.65s info: Got response 109697 bytes. [ec=0xf64] [pid=50523]
6.64s info: Got response 87249 bytes. [ec=0x12d4] [pid=50523]
Signature
- public
Since Async v1.
Nested
Definitions
def initialize(limit = 1, parent: nil)
Signature
-
parameter
limit
Integer
The maximum number of times the semaphore can be acquired before it blocks.
-
parameter
parent
Task | Semaphore | Nil
The parent for holding any children tasks.
Implementation
def initialize(limit = 1, parent: nil)
@count = 0
@limit = limit
@waiting = List.new
@parent = parent
end
attr :count
The current number of tasks that have acquired the semaphore.
attr :limit
The maximum number of tasks that can acquire the semaphore.
attr :waiting
The tasks waiting on this semaphore.
def limit= limit
Allow setting the limit. This is useful for cases where the semaphore is used to limit the number of concurrent tasks, but the number of tasks is not known in advance or needs to be modified.
On increasing the limit, some tasks may be immediately resumed. On decreasing the limit, some tasks may execute until the count is < than the limit.
Signature
-
parameter
limit
Integer
The new limit.
Implementation
def limit= limit
difference = limit - @limit
@limit = limit
# We can't suspend
if difference > 0
difference.times do
break unless node = @waiting.first
node.resume
end
end
end
def empty?
Is the semaphore currently acquired?
Implementation
def empty?
@count.zero?
end
def blocking?
Whether trying to acquire this semaphore would block.
Implementation
def blocking?
@count >= @limit
end
def async(*arguments, parent: (@parent or Task.current), **options)
Run an async task. Will wait until the semaphore is ready until spawning and running the task.
Implementation
def async(*arguments, parent: (@parent or Task.current), **options)
wait
parent.async(**options) do |task|
@count += 1
begin
yield task, *arguments
ensure
self.release
end
end
end
def acquire
Acquire the semaphore, block if we are at the limit. If no block is provided, you must call release manually.
Signature
-
yields
{...}
When the semaphore can be acquired.
Implementation
def acquire
wait
@count += 1
return unless block_given?
begin
return yield
ensure
self.release
end
end
def release
Release the semaphore. Must match up with a corresponding call to acquire
. Will release waiting fibers in FIFO order.
Implementation
def release
@count -= 1
while (@limit - @count) > 0 and node = @waiting.first
node.resume
end
end
def wait
Wait until the semaphore becomes available.
Implementation
def wait
return unless blocking?
@waiting.stack(FiberNode.new(Fiber.current)) do
Fiber.scheduler.transfer while blocking?
end
end