class Group
Manages a group of running processes.
Definitions
def initialize(health_check_interval: 1.0)
Initialize an empty group.
Signature
-
parameter
health_check_interval
Numeric | Nil
The (biggest) interval at which health checks are performed.
Implementation
def initialize(health_check_interval: 1.0)
@health_check_interval = health_check_interval
# The running fibers, indexed by IO:
@running = {}
# This queue allows us to wait for processes to complete, without spawning new processes as a result.
@queue = nil
end
def inspect
Signature
-
returns
String
A human-readable representation of the group.
Implementation
def inspect
"#<#{self.class} running=#{@running.size}>"
end
attr :running
Signature
-
attribute
Hash(IO, Fiber)
the running tasks, indexed by IO.
def size
Signature
-
returns
Integer
The number of running processes.
Implementation
def size
@running.size
end
def running?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def running?
@running.any?
end
def any?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def any?
@running.any?
end
def empty?
Whether the group is empty.
Signature
-
returns
Boolean
Implementation
def empty?
@running.empty?
end
def sleep(duration)
Sleep for at most the specified duration until some state change occurs.
Implementation
def sleep(duration)
self.resume
self.suspend
self.wait_for_children(duration)
end
def wait
Begin any outstanding queued processes and wait for them indefinitely.
Implementation
def wait
self.resume
with_health_checks do |duration|
self.wait_for_children(duration)
end
end
def health_check!
Perform a health check on all running processes.
Implementation
def health_check!
@running.each_value do |fiber|
fiber.resume(:health_check!)
end
end
def interrupt
Interrupt all running processes.
This resumes the controlling fiber with an instance of Interrupt = ::Interrupt
.
Implementation
def interrupt
Console.info(self, "Sending interrupt to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Interrupt)
end
end
def terminate
Terminate all running processes.
This resumes the controlling fiber with an instance of class Async::Container::Terminate
.
Implementation
def terminate
Console.info(self, "Sending terminate to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Terminate)
end
end
def stop(timeout = 1)
Stop all child processes using #terminate
.
Signature
-
parameter
timeout
Boolean | Numeric | Nil
If specified, invoke a graceful shutdown using
#interrupt
first.
Implementation
def stop(timeout = 1)
Console.debug(self, "Stopping all processes...", timeout: timeout)
# Use a default timeout if not specified:
timeout = 1 if timeout == true
if timeout
start_time = Async::Clock.now
self.interrupt
while self.any?
duration = Async::Clock.now - start_time
remaining = timeout - duration
if remaining >= 0
self.wait_for_children(duration)
else
self.wait_for_children(0)
break
end
end
end
# Terminate all children:
self.terminate if any?
# Wait for all children to exit:
self.wait
end
def wait_for(channel)
Wait for a message in the specified class Async::Container::Channel
.
Implementation
def wait_for(channel)
io = channel.in
@running[io] = Fiber.current
while @running.key?(io)
# Wait for some event on the channel:
result = Fiber.yield
if result == Interrupt
channel.interrupt!
elsif result == Terminate
channel.terminate!
elsif result
yield result
elsif message = channel.receive
yield message
else
# Wait for the channel to exit:
return channel.wait
end
end
ensure
@running.delete(io)
end
def select(duration)
Wait for a child process to exit OR a signal to be received.
Implementation
def select(duration)
::Thread.handle_interrupt(SignalException => :immediate) do
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
return readable
end
end