Async::ContainerSourceAsyncContainerNotifyPipe

class Pipe

Implements a process readiness protocol using an inherited pipe file descriptor.

Definitions

NOTIFY_PIPE = 'NOTIFY_PIPE'

The environment variable key which contains the pipe file descriptor.

def self.open!(environment = ENV)

Open a notification client attached to the current NOTIFY_PIPE = 'NOTIFY_PIPE' if possible.

Implementation

def self.open!(environment = ENV)
	if descriptor = environment.delete(NOTIFY_PIPE)
		self.new(::IO.for_fd(descriptor.to_i))
	end
rescue Errno::EBADF => error
	Console.logger.error(self) {error}
	
	return nil
end

def initialize(io)

Initialize the notification client.

Signature

parameter io IO

An IO instance used for sending messages.

Implementation

def initialize(io)
	@io = io
end

def before_spawn(arguments, options)

Inserts or duplicates the environment given an argument array. Sets or clears it in a way that is suitable for ::Process.spawn.

Implementation

def before_spawn(arguments, options)
	environment = environment_for(arguments)
	
	# Use `notify_pipe` option if specified:
	if notify_pipe = options.delete(:notify_pipe)
		options[notify_pipe] = @io
		environment[NOTIFY_PIPE] = notify_pipe.to_s
	
	# Use stdout if it's not redirected:
	# This can cause issues if the user expects stdout to be connected to a terminal.
	# elsif !options.key?(:out)
	# 	options[:out] = @io
	# 	environment[NOTIFY_PIPE] = "1"
	
	# Use fileno 3 if it's available:
	elsif !options.key?(3)
		options[3] = @io
		environment[NOTIFY_PIPE] = "3"
	
	# Otherwise, give up!
	else
		raise ArgumentError, "Please specify valid file descriptor for notify_pipe!"
	end
end

def send(**message)

Formats the message using JSON and sends it to the parent controller. This is suitable for use with class Async::Container::Channel.

Implementation

def send(**message)
	data = ::JSON.dump(message)
	
	@io.puts(data)
	@io.flush
end