class Generic
A base class for implementing containers.
Definitions
def run(count: Container.processor_count, **options, &block)
Run multiple instances of the same block in the container.
Signature
-
parameter
count
Integer
The number of instances to start.
Implementation
def run(count: Container.processor_count, **options, &block)
count.times do
spawn(**options, &block)
end
return self
end
def to_s
A human readable representation of the container.
Signature
-
returns
String
Implementation
def to_s
"#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."
end
def [] key
Look up a child process by key. A key could be a symbol, a file path, or something else which the child instance represents.
Implementation
def [] key
@keyed[key]&.value
end
attr :statistics
Statistics relating to the behavior of children instances.
Signature
-
attribute
Statistics
def failed?
Whether any failures have occurred within the container.
Signature
-
returns
Boolean
Implementation
def failed?
@statistics.failed?
end
def running?
Whether the container has running children instances.
Implementation
def running?
@group.running?
end
def sleep(duration = nil)
Sleep until some state change occurs.
Signature
-
parameter
duration
Numeric
the maximum amount of time to sleep for.
Implementation
def sleep(duration = nil)
@group.sleep(duration)
end
def wait
Wait until all spawned tasks are completed.
Implementation
def wait
@group.wait
end
def status?(flag)
Returns true if all children instances have the specified status flag set.
e.g. :ready
.
This state is updated by the process readiness protocol mechanism. See class Async::Container::Notify::Client
for more details.
Signature
-
returns
Boolean
Implementation
def status?(flag)
# This also returns true if all processes have exited/failed:
@state.all?{|_, state| state[flag]}
end
def wait_until_ready
Wait until all the children instances have indicated that they are ready.
Signature
-
returns
Boolean
The children all became ready.
Implementation
def wait_until_ready
while true
Console.debug(self) do |buffer|
buffer.puts "Waiting for ready:"
@state.each do |child, state|
buffer.puts "\t#{child.inspect}: #{state}"
end
end
self.sleep
if self.status?(:ready)
Console.logger.debug(self) do |buffer|
buffer.puts "All ready:"
@state.each do |child, state|
buffer.puts "\t#{child.inspect}: #{state}"
end
end
return true
end
end
end
def stop(timeout = true)
Stop the children instances.
Signature
-
parameter
timeout
Boolean | Numeric
Whether to stop gracefully, or a specific timeout.
Implementation
def stop(timeout = true)
@running = false
@group.stop(timeout)
if @group.running?
Console.warn(self) {"Group is still running after stopping it!"}
end
ensure
@running = true
end
def spawn(name: nil, restart: false, key: nil, &block)
Spawn a child instance into the container.
Signature
-
parameter
name
String
The name of the child instance.
-
parameter
restart
Boolean
Whether to restart the child instance if it fails.
-
parameter
key
Symbol
A key used for reloading child instances.
Implementation
def spawn(name: nil, restart: false, key: nil, &block)
name ||= UNNAMED
if mark?(key)
Console.debug(self) {"Reusing existing child for #{key}: #{name}"}
return false
end
@statistics.spawn!
fiber do
while @running
child = self.start(name, &block)
state = insert(key, child)
begin
status = @group.wait_for(child) do |message|
state.update(message)
end
ensure
delete(key, child)
end
if status.success?
Console.debug(self) {"#{child} exited with #{status}"}
else
@statistics.failure!
Console.error(self, status: status)
end
if restart
@statistics.restart!
else
break
end
end
end.resume
return true
end
def async(**options, &block)
- deprecated
Signature
- deprecated
Please use
Async::Container::Generic#spawn
orAsync::Container::Generic.run
instead.
Implementation
def async(**options, &block)
spawn(**options) do |instance|
Async::Reactor.run(instance, &block)
end
end
def reload
Reload the container's keyed instances.
Implementation
def reload
@keyed.each_value(&:clear!)
yield
dirty = false
@keyed.delete_if do |key, value|
value.stop? && (dirty = true)
end
return dirty
end
def mark?(key)
Mark the container's keyed instance which ensures that it won't be discarded.
Implementation
def mark?(key)
if key
if value = @keyed[key]
value.mark!
return true
end
end
return false
end
def key?(key)
Whether a child instance exists for the given key.
Implementation
def key?(key)
if key
@keyed.key?(key)
end
end
def insert(key, child)
Register the child (value) as running.
Implementation
def insert(key, child)
if key
@keyed[key] = Keyed.new(key, child)
end
state = {}
@state[child] = state
return state
end
def delete(key, child)
Clear the child (value) as running.
Implementation
def delete(key, child)
if key
@keyed.delete(key)
end
@state.delete(child)
end