Async::HTTPSourceAsyncHTTPBodyPipe

class Pipe

Definitions

def initialize(input, output = Writable.new, task: Task.current)

If the input stream is closed first, it's likely the output stream will also be closed.

Implementation

def initialize(input, output = Writable.new, task: Task.current)
	@input = input
	@output = output
	
	head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
	
	@head = ::IO::Stream(head)
	@tail = tail
	
	@reader = nil
	@writer = nil
	
	task.async(transient: true, &self.method(:reader))
	task.async(transient: true, &self.method(:writer))
end

def reader(task)

Read from the @input stream and write to the head of the pipe.

Implementation

def reader(task)
	@reader = task
	
	task.annotate "#{self.class} reader."
	
	while chunk = @input.read
		@head.write(chunk)
		@head.flush
	end
	
	@head.close_write
rescue => error
	raise
ensure
	@input.close(error)
	
	close_head if @writer&.finished?
end

def writer(task)

Read from the head of the pipe and write to the @output stream. If the @tail is closed, this will cause chunk to be nil, which in turn will call @output.close and @head.close

Implementation

def writer(task)
	@writer = task
	
	task.annotate "#{self.class} writer."
	
	while chunk = @head.read_partial
		@output.write(chunk)
	end
rescue => error
	raise
ensure
	@output.close_write(error)
	
	close_head if @reader&.finished?
end