AsyncSourceAsyncQueue

class Queue

A queue which allows items to be processed in order.

It has a compatible interface with class Async::Notification and class Async::Condition, except that it's multi-value.

Signature

public

Since stable-v1.

Definitions

def initialize(parent: nil, available: Notification.new)

Create a new queue.

Signature

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

parameter available Notification

The notification to use for signaling when items are available.

Implementation

def initialize(parent: nil, available: Notification.new)
	@items = []
	@parent = parent
	@available = available
end

attr :items

Signature

attribute Array

The items in the queue.

def size

Signature

returns Integer

The number of items in the queue.

Implementation

def size
	@items.size
end

def empty?

Signature

returns Boolean

Whether the queue is empty.

Implementation

def empty?
	@items.empty?
end

def push(item)

Add an item to the queue.

Implementation

def push(item)
	@items << item
	
	@available.signal unless self.empty?
end

def enqueue(*items)

Add multiple items to the queue.

Implementation

def enqueue(*items)
	@items.concat(items)
	
	@available.signal unless self.empty?
end

def dequeue

Remove and return the next item from the queue.

Implementation

def dequeue
	while @items.empty?
		@available.wait
	end
	
	@items.shift
end

def async(parent: (@parent or Task.current), **options, &block)

  • asynchronous

Process each item in the queue.

Signature

asynchronous

Executes the given block concurrently for each item.

parameter arguments Array

The arguments to pass to the block.

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

parameter options Hash

The options to pass to the task.

yields {|task| ...}

When the system is idle, the block will be executed in a new task.

Implementation

def async(parent: (@parent or Task.current), **options, &block)
	while item = self.dequeue
		parent.async(item, **options, &block)
	end
end

def each

Enumerate each item in the queue.

Implementation

def each
	while item = self.dequeue
		yield item
	end
end

def signal(value = nil)

Signal the queue with a value, the same as #enqueue.

Implementation

def signal(value = nil)
	self.enqueue(value)
end

def wait

Wait for an item to be available, the same as #dequeue.

Implementation

def wait
	self.dequeue
end

Discussion