class Pipe
Definitions
def initialize(input, output = Writable.new, task: Task.current)
If the input stream is closed first, it's likely the output stream will also be closed.
Implementation
def initialize(input, output = Writable.new, task: Task.current)
@input = input
@output = output
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
@head = ::IO::Stream(head)
@tail = tail
@reader = nil
@writer = nil
task.async(transient: true, &self.method(:reader))
task.async(transient: true, &self.method(:writer))
end
def reader(task)
Read from the @input stream and write to the head of the pipe.
Implementation
def reader(task)
@reader = task
task.annotate "#{self.class} reader."
while chunk = @input.read
@head.write(chunk)
@head.flush
end
@head.close_write
rescue => error
raise
ensure
@input.close(error)
close_head if @writer&.finished?
end
def writer(task)
Read from the head of the pipe and write to the @output stream.
If the @tail is closed, this will cause chunk to be nil, which in turn will call @output.close
and @head.close
Implementation
def writer(task)
@writer = task
task.annotate "#{self.class} writer."
while chunk = @head.read_partial
@output.write(chunk)
end
rescue => error
raise
ensure
@output.close_write(error)
close_head if @reader&.finished?
end