Async::ContainerSourceAsyncContainerGroup

class Group

Manages a group of running processes.

Definitions

def initialize(health_check_interval: 1.0)

Initialize an empty group.

Signature

parameter health_check_interval Numeric | Nil

The (biggest) interval at which health checks are performed.

Implementation

def initialize(health_check_interval: 1.0)
	@health_check_interval = health_check_interval
	
	# The running fibers, indexed by IO:
	@running = {}
end

def inspect

Signature

returns String

A human-readable representation of the group.

Implementation

def inspect
	"#<#{self.class} running=#{@running.size}>"
end

attr :running

Signature

attribute Hash(IO, Fiber)

the running tasks, indexed by IO.

def size

Signature

returns Integer

The number of running processes.

Implementation

def size
	@running.size
end

def running?

Whether the group contains any running processes.

Signature

returns Boolean

Implementation

def running?
	@running.any?
end

def any?

Whether the group contains any running processes.

Signature

returns Boolean

Implementation

def any?
	@running.any?
end

def empty?

Whether the group is empty.

Signature

returns Boolean

Implementation

def empty?
	@running.empty?
end

def sleep(duration)

Sleep for at most the specified duration until some state change occurs.

Implementation

def sleep(duration)
	self.wait_for_children(duration)
end

def wait

Begin any outstanding queued processes and wait for them indefinitely.

Implementation

def wait
	with_health_checks do |duration|
		self.wait_for_children(duration)
	end
end

def health_check!

Perform a health check on all running processes.

Implementation

def health_check!
	each_running do |fiber|
		fiber.resume(:health_check!)
	end
end

def interrupt

Interrupt all running processes. This resumes the controlling fiber with an instance of Interrupt = ::Interrupt.

Implementation

def interrupt
	Console.info(self, "Sending interrupt to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Interrupt)
	end
end

def terminate

Terminate all running processes. This resumes the controlling fiber with an instance of class Async::Container::Terminate.

Implementation

def terminate
	Console.info(self, "Sending terminate to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Terminate)
	end
end

def kill

Kill all running processes. This resumes the controlling fiber with an instance of class Async::Container::Kill.

Implementation

def kill
	Console.info(self, "Sending kill to #{@running.size} running processes...")
	each_running do |fiber|
		fiber.resume(Kill)
	end
end

def stop(graceful = GRACEFUL_TIMEOUT)

Stop all child processes with a multi-phase shutdown sequence.

A graceful shutdown performs the following sequence:

  1. Send SIGINT and wait up to graceful seconds if specified.
  2. Send SIGKILL and wait indefinitely for process cleanup.

If graceful is true, default to DEFAULT_GRACEFUL_TIMEOUT (10 seconds). If graceful is false, skip the SIGINT phase and go directly to SIGKILL.

Signature

parameter graceful Boolean | Numeric

Whether to send SIGINT first or skip directly to SIGKILL.

Implementation

def stop(graceful = GRACEFUL_TIMEOUT)
	Console.debug(self, "Stopping all processes...", graceful: graceful)
	
	# If a timeout is specified, interrupt the children first:
	if graceful
		# Send SIGINT to the children:
		self.interrupt
		
		if graceful == true
			graceful = DEFAULT_GRACEFUL_TIMEOUT
		end
		
		clock = Clock.start
		
		# Wait for the children to exit:
		self.wait_for_exit(clock, graceful)
	end
ensure
	# Do our best to clean up the children:
	if any?
		if graceful
			Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock)
		end
		
		self.kill
		self.wait
	end
end

def wait_for(channel)

Wait for a message in the specified class Async::Container::Channel.

Implementation

def wait_for(channel)
	io = channel.in
	
	@running[io] = Fiber.current
	
	while @running.key?(io)
		# Wait for some event on the channel:
		result = Fiber.yield
		
		if result == Interrupt
			channel.interrupt!
		elsif result == Terminate
			channel.terminate!
		elsif result == Kill
			channel.kill!
		elsif result
			yield result
		elsif message = channel.receive
			yield message
		else
			# Wait for the channel to exit:
			return channel.wait
		end
	end
ensure
	@running.delete(io)
end

def select(duration)

Wait for a child process to exit OR a signal to be received.

Implementation

def select(duration)
	::Thread.handle_interrupt(SignalException => :immediate) do
		readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
		
		return readable
	end
end