ProjectSource

Async::Semaphore

A synchronization primitive, which limits access to a given resource, such as a limited number of database connections, open files, or network connections.

Example

require 'async'
require 'async/semaphore'
require 'net/http'

Sync do
	# Only allow two concurrent tasks at a time:
	semaphore = Async::Semaphore.new(2)
	
	# Generate an array of 10 numbers:
	terms = ['ruby', 'python', 'go', 'java', 'c++'] 
	
	# Search for the terms:
	terms.each do |term|
		semaphore.async do |task|
			Console.logger.info("Searching for #{term}...")
			response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}")
			Console.logger.info("Got response #{response.size} bytes.")
		end
	end
end

Output

0.0s     info: Searching for ruby... [ec=0x3c] [pid=50523]
0.04s     info: Searching for python... [ec=0x21c] [pid=50523]
1.7s     info: Got response 182435 bytes. [ec=0x3c] [pid=50523]
1.71s     info: Searching for go... [ec=0x834] [pid=50523]
3.0s     info: Got response 204854 bytes. [ec=0x21c] [pid=50523]
3.0s     info: Searching for java... [ec=0xf64] [pid=50523]
4.32s     info: Got response 103235 bytes. [ec=0x834] [pid=50523]
4.32s     info: Searching for c++... [ec=0x12d4] [pid=50523]
4.65s     info: Got response 109697 bytes. [ec=0xf64] [pid=50523]
6.64s     info: Got response 87249 bytes. [ec=0x12d4] [pid=50523]

Signature

public

Since stable-v1.

Definitions

def initialize(limit = 1, parent: nil)

Signature

parameter limit Integer

The maximum number of times the semaphore can be acquired before it blocks.

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

Implementation

def initialize(limit = 1, parent: nil)
	@count = 0
	@limit = limit
	@waiting = []
	
	@parent = parent
end

attr :count

The current number of tasks that have acquired the semaphore.

attr :limit

The maximum number of tasks that can acquire the semaphore.

attr :waiting

The tasks waiting on this semaphore.

def empty?

Is the semaphore currently acquired?

Implementation

def empty?
	@count.zero?
end

def blocking?

Whether trying to acquire this semaphore would block.

Implementation

def blocking?
	@count >= @limit
end

def async(*arguments, parent: (@parent or Task.current), **options)

Run an async task. Will wait until the semaphore is ready until spawning and running the task.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options)
	wait
	
	parent.async(**options) do |task|
		@count += 1
		
		begin
			yield task, *arguments
		ensure
			self.release
		end
	end
end

def acquire

Acquire the semaphore, block if we are at the limit. If no block is provided, you must call release manually.

Signature

yields {...}

When the semaphore can be acquired.

Implementation

def acquire
	wait
	
	@count += 1
	
	return unless block_given?
	
	begin
		return yield
	ensure
		self.release
	end
end

def release

Release the semaphore. Must match up with a corresponding call to acquire. Will release waiting fibers in FIFO order.

Implementation

def release
	@count -= 1
	
	while (@limit - @count) > 0 and fiber = @waiting.shift
		if fiber.alive?
			Fiber.scheduler.resume(fiber)
		end
	end
end

def wait

Wait until the semaphore becomes available.

Implementation

def wait
	fiber = Fiber.current
	
	if blocking?
		@waiting << fiber
		Fiber.scheduler.transfer while blocking?
	end
rescue Exception
	@waiting.delete(fiber)
	raise
end

Discussion