class Group
Manages a group of running processes.
Definitions
def initialize(health_check_interval: 1.0)
Initialize an empty group.
Signature
-
parameter
health_check_intervalNumeric | Nil The (biggest) interval at which health checks are performed.
Implementation
def initialize(health_check_interval: 1.0)
@health_check_interval = health_check_interval
# The running fibers, indexed by IO:
@running = {}
end
def inspect
Signature
-
returns
String A human-readable representation of the group.
Implementation
def inspect
"#<#{self.class} running=#{@running.size}>"
end
attr :running
Signature
-
attribute
Hash(IO, Fiber) the running tasks, indexed by IO.
def size
Signature
-
returns
Integer The number of running processes.
Implementation
def size
@running.size
end
def running?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def running?
@running.any?
end
def any?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def any?
@running.any?
end
def empty?
Whether the group is empty.
Signature
-
returns
Boolean
Implementation
def empty?
@running.empty?
end
def sleep(duration)
Sleep for at most the specified duration until some state change occurs.
Implementation
def sleep(duration)
self.wait_for_children(duration)
end
def wait
Begin any outstanding queued processes and wait for them indefinitely.
Implementation
def wait
with_health_checks do |duration|
self.wait_for_children(duration)
end
end
def health_check!
Perform a health check on all running processes.
Implementation
def health_check!
each_running do |fiber|
fiber.resume(:health_check!)
end
end
def interrupt
Interrupt all running processes.
This resumes the controlling fiber with an instance of Interrupt = ::Interrupt.
Implementation
def interrupt
Console.info(self, "Sending interrupt to #{@running.size} running processes...")
each_running do |fiber|
fiber.resume(Interrupt)
end
end
def terminate
Terminate all running processes.
This resumes the controlling fiber with an instance of class Async::Container::Terminate.
Implementation
def terminate
Console.info(self, "Sending terminate to #{@running.size} running processes...")
each_running do |fiber|
fiber.resume(Terminate)
end
end
def kill
Kill all running processes.
This resumes the controlling fiber with an instance of class Async::Container::Kill.
Implementation
def kill
Console.info(self, "Sending kill to #{@running.size} running processes...")
each_running do |fiber|
fiber.resume(Kill)
end
end
def stop(graceful = GRACEFUL_TIMEOUT)
Stop all child processes with a multi-phase shutdown sequence.
A graceful shutdown performs the following sequence:
- Send SIGINT and wait up to
gracefulseconds if specified. - Send SIGKILL and wait indefinitely for process cleanup.
If graceful is true, default to DEFAULT_GRACEFUL_TIMEOUT (10 seconds).
If graceful is false, skip the SIGINT phase and go directly to SIGKILL.
Signature
-
parameter
gracefulBoolean | Numeric Whether to send SIGINT first or skip directly to SIGKILL.
Implementation
def stop(graceful = GRACEFUL_TIMEOUT)
Console.debug(self, "Stopping all processes...", graceful: graceful)
# If a timeout is specified, interrupt the children first:
if graceful
# Send SIGINT to the children:
self.interrupt
if graceful == true
graceful = DEFAULT_GRACEFUL_TIMEOUT
end
clock = Clock.start
# Wait for the children to exit:
self.wait_for_exit(clock, graceful)
end
ensure
# Do our best to clean up the children:
if any?
if graceful
Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock)
end
self.kill
self.wait
end
end
def wait_for(channel)
Wait for a message in the specified class Async::Container::Channel.
Implementation
def wait_for(channel)
io = channel.in
@running[io] = Fiber.current
while @running.key?(io)
# Wait for some event on the channel:
result = Fiber.yield
if result == Interrupt
channel.interrupt!
elsif result == Terminate
channel.terminate!
elsif result == Kill
channel.kill!
elsif result
yield result
elsif message = channel.receive
yield message
else
# Wait for the channel to exit:
return channel.wait
end
end
ensure
@running.delete(io)
end
def select(duration)
Wait for a child process to exit OR a signal to be received.
Implementation
def select(duration)
::Thread.handle_interrupt(SignalException => :immediate) do
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
return readable
end
end