Async SourceAsyncSemaphore

class Semaphore

A synchronization primitive, which limits access to a given resource, such as a limited number of database connections, open files, or network connections.

Example

require 'async'
require 'async/semaphore'
require 'net/http'

Sync do
	# Only allow two concurrent tasks at a time:
	semaphore = Async::Semaphore.new(2)
	
	# Generate an array of 10 numbers:
	terms = ['ruby', 'python', 'go', 'java', 'c++'] 
	
	# Search for the terms:
	terms.each do |term|
		semaphore.async do |task|
			Console.logger.info("Searching for #{term}...")
			response = Net::HTTP.get(URI "https://www.google.com/search?q=#{term}")
			Console.logger.info("Got response #{response.size} bytes.")
		end
	end
end

Output

0.0s     info: Searching for ruby... [ec=0x3c] [pid=50523]
0.04s     info: Searching for python... [ec=0x21c] [pid=50523]
1.7s     info: Got response 182435 bytes. [ec=0x3c] [pid=50523]
1.71s     info: Searching for go... [ec=0x834] [pid=50523]
3.0s     info: Got response 204854 bytes. [ec=0x21c] [pid=50523]
3.0s     info: Searching for java... [ec=0xf64] [pid=50523]
4.32s     info: Got response 103235 bytes. [ec=0x834] [pid=50523]
4.32s     info: Searching for c++... [ec=0x12d4] [pid=50523]
4.65s     info: Got response 109697 bytes. [ec=0xf64] [pid=50523]
6.64s     info: Got response 87249 bytes. [ec=0x12d4] [pid=50523]

Signature

public

Since stable-v1.

Nested

Definitions

def initialize(limit = 1, parent: nil)

Signature

parameter limit Integer

The maximum number of times the semaphore can be acquired before it blocks.

parameter parent Task | Semaphore | Nil

The parent for holding any children tasks.

Implementation

def initialize(limit = 1, parent: nil)
	@count = 0
	@limit = limit
	@waiting = List.new
	
	@parent = parent
end

attr :count

The current number of tasks that have acquired the semaphore.

attr :limit

The maximum number of tasks that can acquire the semaphore.

attr :waiting

The tasks waiting on this semaphore.

def limit= limit

Allow setting the limit. This is useful for cases where the semaphore is used to limit the number of concurrent tasks, but the number of tasks is not known in advance or needs to be modified.

On increasing the limit, some tasks may be immediately resumed. On decreasing the limit, some tasks may execute until the count is < than the limit.

Signature

parameter limit Integer

The new limit.

Implementation

def limit= limit
	difference = limit - @limit
	@limit = limit
	
	# We can't suspend 
	if difference > 0
		difference.times do
			break unless node = @waiting.first
			
			node.resume
		end
	end
end

def empty?

Is the semaphore currently acquired?

Implementation

def empty?
	@count.zero?
end

def blocking?

Whether trying to acquire this semaphore would block.

Implementation

def blocking?
	@count >= @limit
end

def async(*arguments, parent: (@parent or Task.current), **options)

Run an async task. Will wait until the semaphore is ready until spawning and running the task.

Implementation

def async(*arguments, parent: (@parent or Task.current), **options)
	wait
	
	parent.async(**options) do |task|
		@count += 1
		
		begin
			yield task, *arguments
		ensure
			self.release
		end
	end
end

def acquire

Acquire the semaphore, block if we are at the limit. If no block is provided, you must call release manually.

Signature

yields {...}

When the semaphore can be acquired.

Implementation

def acquire
	wait
	
	@count += 1
	
	return unless block_given?
	
	begin
		return yield
	ensure
		self.release
	end
end

def release

Release the semaphore. Must match up with a corresponding call to acquire. Will release waiting fibers in FIFO order.

Implementation

def release
	@count -= 1
	
	while (@limit - @count) > 0 and node = @waiting.first
		node.resume
	end
end

def wait

Wait until the semaphore becomes available.

Implementation

def wait
	return unless blocking?
	
	@waiting.stack(FiberNode.new(Fiber.current)) do
		Fiber.scheduler.transfer while blocking?
	end
end

Discussion