class Group
Manages a group of running processes.
Definitions
def initialize
Initialize an empty group.
Implementation
def initialize
@running = {}
# This queue allows us to wait for processes to complete, without spawning new processes as a result.
@queue = nil
end
attr :running
Signature
-
attribute
Hash(IO, Fiber)
the running tasks, indexed by IO.
def running?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def running?
@running.any?
end
def any?
Whether the group contains any running processes.
Signature
-
returns
Boolean
Implementation
def any?
@running.any?
end
def empty?
Whether the group is empty.
Signature
-
returns
Boolean
Implementation
def empty?
@running.empty?
end
def sleep(duration)
Sleep for at most the specified duration until some state change occurs.
Implementation
def sleep(duration)
self.resume
self.suspend
self.wait_for_children(duration)
end
def wait
Begin any outstanding queued processes and wait for them indefinitely.
Implementation
def wait
self.resume
while self.running?
self.wait_for_children
end
end
def interrupt
Interrupt all running processes.
This resumes the controlling fiber with an instance of Interrupt = ::Interrupt
.
Implementation
def interrupt
Console.logger.debug(self, "Sending interrupt to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Interrupt)
end
end
def terminate
Terminate all running processes.
This resumes the controlling fiber with an instance of class Async::Container::Terminate
.
Implementation
def terminate
Console.logger.debug(self, "Sending terminate to #{@running.size} running processes...")
@running.each_value do |fiber|
fiber.resume(Terminate)
end
end
def stop(timeout = 1)
Stop all child processes using #terminate
.
Signature
-
parameter
timeout
Boolean | Numeric | Nil
If specified, invoke a graceful shutdown using
#interrupt
first.
Implementation
def stop(timeout = 1)
# Use a default timeout if not specified:
timeout = 1 if timeout == true
if timeout
start_time = Async::Clock.now
self.interrupt
while self.any?
duration = Async::Clock.now - start_time
remaining = timeout - duration
if remaining >= 0
self.wait_for_children(duration)
else
self.wait_for_children(0)
break
end
end
end
# Terminate all children:
self.terminate
# Wait for all children to exit:
self.wait
end
def wait_for(channel)
Wait for a message in the specified class Async::Container::Channel
.
Implementation
def wait_for(channel)
io = channel.in
@running[io] = Fiber.current
while @running.key?(io)
result = Fiber.yield
if result == Interrupt
channel.interrupt!
elsif result == Terminate
channel.terminate!
elsif message = channel.receive
yield message
else
return channel.wait
end
end
ensure
@running.delete(io)
end