Async::Container SourceAsyncContainerGeneric

class Generic

A base class for implementing containers.

Definitions

def run(count: Container.processor_count, **options, &block)

Run multiple instances of the same block in the container.

Signature

parameter count Integer

The number of instances to start.

Implementation

def run(count: Container.processor_count, **options, &block)
	count.times do
		spawn(**options, &block)
	end
	
	return self
end

def to_s

A human readable representation of the container.

Signature

returns String

Implementation

def to_s
	"#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."
end

def [] key

Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.

Implementation

def [] key
	@keyed[key]&.value
end

attr :statistics

Statistics relating to the behavior of children instances.

Signature

attribute Statistics

def failed?

Whether any failures have occurred within the container.

Signature

returns Boolean

Implementation

def failed?
	@statistics.failed?
end

def running?

Whether the container has running children instances.

Implementation

def running?
	@group.running?
end

def sleep(duration = nil)

Sleep until some state change occurs.

Signature

parameter duration Numeric

the maximum amount of time to sleep for.

Implementation

def sleep(duration = nil)
	@group.sleep(duration)
end

def wait

Wait until all spawned tasks are completed.

Implementation

def wait
	@group.wait
end

def status?(flag)

Returns true if all children instances have the specified status flag set. e.g. :ready. This state is updated by the process readiness protocol mechanism. See class Async::Container::Notify::Client for more details.

Signature

returns Boolean

Implementation

def status?(flag)
	# This also returns true if all processes have exited/failed:
	@state.all?{|_, state| state[flag]}
end

def wait_until_ready

Wait until all the children instances have indicated that they are ready.

Signature

returns Boolean

The children all became ready.

Implementation

def wait_until_ready
	while true
		Console.logger.debug(self) do |buffer|
			buffer.puts "Waiting for ready:"
			@state.each do |child, state|
				buffer.puts "\t#{child.class}: #{state.inspect}"
			end
		end
		
		self.sleep
		
		if self.status?(:ready)
			return true
		end
	end
end

def stop(timeout = true)

Stop the children instances.

Signature

parameter timeout Boolean | Numeric

Whether to stop gracefully, or a specific timeout.

Implementation

def stop(timeout = true)
	@running = false
	@group.stop(timeout)
	
	if @group.running?
		Console.logger.warn(self) {"Group is still running after stopping it!"}
	end
ensure
	@running = true
end

def spawn(name: nil, restart: false, key: nil, &block)

Spawn a child instance into the container.

Signature

parameter name String

The name of the child instance.

parameter restart Boolean

Whether to restart the child instance if it fails.

parameter key Symbol

A key used for reloading child instances.

Implementation

def spawn(name: nil, restart: false, key: nil, &block)
	name ||= UNNAMED
	
	if mark?(key)
		Console.logger.debug(self) {"Reusing existing child for #{key}: #{name}"}
		return false
	end
	
	@statistics.spawn!
	
	fiber do
		while @running
			child = self.start(name, &block)
			
			state = insert(key, child)
			
			begin
				status = @group.wait_for(child) do |message|
					state.update(message)
				end
			ensure
				delete(key, child)
			end
			
			if status.success?
				Console.logger.debug(self) {"#{child} exited with #{status}"}
			else
				@statistics.failure!
				Console.logger.error(self) {status}
			end
			
			if restart
				@statistics.restart!
			else
				break
			end
		end
	# ensure
	# 	Console.logger.error(self) {$!} if $!
	end.resume
	
	return true
end

def async(**options, &block)

  • deprecated

Signature

deprecated

Please use Async::Container::Generic#spawn or Async::Container::Generic.run instead.

Implementation

def async(**options, &block)
	spawn(**options) do |instance|
		Async::Reactor.run(instance, &block)
	end
end

def reload

Reload the container's keyed instances.

Implementation

def reload
	@keyed.each_value(&:clear!)
	
	yield
	
	dirty = false
	
	@keyed.delete_if do |key, value|
		value.stop? && (dirty = true)
	end
	
	return dirty
end

def mark?(key)

Mark the container's keyed instance which ensures that it won't be discarded.

Implementation

def mark?(key)
	if key
		if value = @keyed[key]
			value.mark!
			
			return true
		end
	end
	
	return false
end

def key?(key)

Whether a child instance exists for the given key.

Implementation

def key?(key)
	if key
		@keyed.key?(key)
	end
end

def insert(key, child)

Register the child (value) as running.

Implementation

def insert(key, child)
	if key
		@keyed[key] = Keyed.new(key, child)
	end
	
	state = {}
	
	@state[child] = state
	
	return state
end

def delete(key, child)

Clear the child (value) as running.

Implementation

def delete(key, child)
	if key
		@keyed.delete(key)
	end
	
	@state.delete(child)
end