class Pipe
Implements a process readiness protocol using an inherited pipe file descriptor.
Definitions
NOTIFY_PIPE = 'NOTIFY_PIPE'
The environment variable key which contains the pipe file descriptor.
def self.open!(environment = ENV)
Open a notification client attached to the current NOTIFY_PIPE = 'NOTIFY_PIPE'
if possible.
Implementation
def self.open!(environment = ENV)
if descriptor = environment.delete(NOTIFY_PIPE)
self.new(::IO.for_fd(descriptor.to_i))
end
rescue Errno::EBADF => error
Console.logger.error(self) {error}
return nil
end
def initialize(io)
Initialize the notification client.
Signature
-
parameter
io
IO
An IO instance used for sending messages.
Implementation
def initialize(io)
@io = io
end
def before_spawn(arguments, options)
Inserts or duplicates the environment given an argument array.
Sets or clears it in a way that is suitable for ::Process.spawn
.
Implementation
def before_spawn(arguments, options)
environment = environment_for(arguments)
# Use `notify_pipe` option if specified:
if notify_pipe = options.delete(:notify_pipe)
options[notify_pipe] = @io
environment[NOTIFY_PIPE] = notify_pipe.to_s
# Use stdout if it's not redirected:
# This can cause issues if the user expects stdout to be connected to a terminal.
# elsif !options.key?(:out)
# options[:out] = @io
# environment[NOTIFY_PIPE] = "1"
# Use fileno 3 if it's available:
elsif !options.key?(3)
options[3] = @io
environment[NOTIFY_PIPE] = "3"
# Otherwise, give up!
else
raise ArgumentError, "Please specify valid file descriptor for notify_pipe!"
end
end
def send(**message)
Formats the message using JSON and sends it to the parent controller.
This is suitable for use with class Async::Container::Channel
.
Implementation
def send(**message)
data = ::JSON.dump(message)
@io.puts(data)
@io.flush
end