AsyncSourceAsyncNode

class Node

A node in a tree, used for implementing the task hierarchy.

Definitions

def initialize(parent = nil, annotation: nil, transient: false)

Create a new node in the tree.

Signature

parameter parent Node | Nil

This node will attach to the given parent.

Implementation

def initialize(parent = nil, annotation: nil, transient: false)
	@parent = nil
	@children = nil
	
	@annotation = annotation
	@object_name = nil
	
	@transient = transient
	
	@head = nil
	@tail = nil
	
	if parent
		parent.add_child(self)
	end
end

def root

Signature

returns Node

The root node in the hierarchy.

Implementation

def root
	@parent&.root || self
end

attr_accessor :head

  • private

Signature

private

attr_accessor :tail

  • private

Signature

private

attr :parent

Signature

attribute Node

The parent node.

attr :children

Signature

attribute Children | Nil

Optional list of children.

attr :annotation

Signature

attribute String | Nil

A useful identifier for the current node.

def children?

Whether this node has any children.

Signature

returns Boolean

Implementation

def children?
	@children && !@children.empty?
end

def transient?

Represents whether a node is transient. Transient nodes are not considered when determining if a node is finished. This is useful for tasks which are internal to an object rather than explicit user concurrency. For example, a child task which is pruning a connection pool is transient, because it is not directly related to the parent task, and should not prevent the parent task from finishing.

Implementation

def transient?
	@transient
end

def transient=(value)

Change the transient state of the node.

A transient node is not considered when determining if a node is finished, and propagates up if the parent is consumed.

Signature

parameter value Boolean

Whether the node is transient.

Implementation

def transient=(value)
	if @transient != value
		@transient = value
		
		@parent&.children&.adjust_transient_count(value)
	end
end

def annotate(annotation)

Annotate the node with a description.

Signature

parameter annotation String

The description to annotate the node with.

Implementation

def annotate(annotation)
	if block_given?
		begin
			current_annotation = @annotation
			@annotation = annotation
			return yield
		ensure
			@annotation = current_annotation
		end
	else
		@annotation = annotation
	end
end

def description

A description of the node, including the annotation and object name.

Signature

returns String

The description of the node.

Implementation

def description
	@object_name ||= "#{self.class}:#{format '%#018x', object_id}#{@transient ? ' transient' : nil}"
	
	if annotation = self.annotation
		"#{@object_name} #{annotation}"
	elsif line = self.backtrace(0, 1)&.first
		"#{@object_name} #{line}"
	else
		@object_name
	end
end

def backtrace(*arguments)

Provides a backtrace for nodes that have an active execution context.

Signature

returns Array(Thread::Backtrace::Locations) | Nil

The backtrace of the node, if available.

Implementation

def backtrace(*arguments)
	nil
end

def to_s

Signature

returns String

A description of the node.

Implementation

def to_s
	"\#<#{self.description}>"
end

def parent=(parent)

Change the parent of this node.

Signature

parameter parent Node | Nil

The parent to attach to, or nil to detach.

returns Node

Itself.

Implementation

def parent=(parent)
	return if @parent.equal?(parent)
	
	if @parent
		@parent.remove_child(self)
		@parent = nil
	end
	
	if parent
		parent.add_child(self)
	end
	
	return self
end

def finished?

Whether the node can be consumed (deleted) safely. By default, checks if the children set is empty.

Signature

returns Boolean

Implementation

def finished?
	@children.nil? || @children.finished?
end

def consume

If the node has a parent, and is Async::Node#finished?, then remove this node from the parent.

Implementation

def consume
	if parent = @parent and finished?
		parent.remove_child(self)
		
		# If we have children, then we need to move them to our the parent if they are not finished:
		if @children
			while child = @children.shift
				if child.finished?
					child.set_parent(nil)
				else
					parent.add_child(child)
				end
			end
			
			@children = nil
		end
		
		parent.consume
	end
end

def traverse(&block)

Traverse the task tree.

Signature

returns Enumerator

An enumerator which will traverse the tree if no block is given.

yields {|node, level| ...}

The node and the level relative to the given root.

Implementation

def traverse(&block)
	return enum_for(:traverse) unless block_given?
	
	self.traverse_recurse(&block)
end

def terminate

Immediately terminate all children tasks, including transient tasks. Internally invokes stop(false) on all children. This should be considered a last ditch effort and is used when closing the scheduler.

Implementation

def terminate
	# Attempt to stop the current task immediately, and all children:
	stop(false)
	
	# If that doesn't work, take more serious action:
	@children&.each do |child|
		child.terminate
	end
	
	return @children.nil?
end

def stop(later = false)

Attempt to stop the current node immediately, including all non-transient children. Invokes #stop_children to stop all children.

Signature

parameter later Boolean

Whether to defer stopping until some point in the future.

Implementation

def stop(later = false)
	# The implementation of this method may defer calling `stop_children`.
	stop_children(later)
end

def stopped?

Whether the node has been stopped.

Implementation

def stopped?
	@children.nil?
end

def print_hierarchy(out = $stdout, backtrace: true)

Print the hierarchy of the task tree from the given node.

Signature

parameter out IO

The output stream to write to.

parameter backtrace Boolean

Whether to print the backtrace of each node.

Implementation

def print_hierarchy(out = $stdout, backtrace: true)
	self.traverse do |node, level|
		indent = "\t" * level
		
		out.puts "#{indent}#{node}"
		
		print_backtrace(out, indent, node) if backtrace
	end
end

Discussion