Async::HTTPSourceAsyncHTTPProtocolHTTP2Output

class Output

Writes body data to an HTTP/2 stream, respecting flow control windows.

Definitions

def initialize(stream, body, trailer = nil)

Initialize the output handler.

Signature

parameter stream Stream

The HTTP/2 stream to write to.

parameter body Protocol::HTTP::Body::Readable

The body to read from.

parameter trailer Protocol::HTTP::Headers | Nil

Optional trailing headers.

Implementation

def initialize(stream, body, trailer = nil)
	@stream = stream
	@body = body
	@trailer = trailer
	
	@task = nil
	
	@guard = ::Mutex.new
	@window_updated = ::ConditionVariable.new
end

def start(parent: Task.current)

Start an asynchronous task to write the body to the stream.

Implementation

def start(parent: Task.current)
	raise "Task already started!" if @task
	
	if @body.stream?
		@task = parent.async(&self.method(:stream))
	else
		@task = parent.async(&self.method(:passthrough))
	end
end

def window_updated(size)

Signal that the flow control window has been updated.

Signature

parameter size Integer

The new window size.

returns Boolean

Always returns true.

Implementation

def window_updated(size)
	@guard.synchronize do
		@window_updated.signal
	end
	
	return true
end

def write(chunk)

Write a chunk of data to the HTTP/2 stream, respecting flow control.

Signature

parameter chunk String

The data to write.

Implementation

def write(chunk)
	until chunk.empty?
		maximum_size = @stream.available_frame_size
		
		# We try to avoid synchronization if possible:
		if maximum_size <= 0
			@guard.synchronize do
				maximum_size = @stream.available_frame_size
				
				while maximum_size <= 0
					@window_updated.wait(@guard)
					
					maximum_size = @stream.available_frame_size
				end
			end
		end
		
		break unless chunk = send_data(chunk, maximum_size)
	end
end

def close_write(error = nil)

Finish writing to the stream.

Signature

parameter error Exception | Nil

An optional error that caused the close.

Implementation

def close_write(error = nil)
	if stream = @stream
		@stream = nil
		stream.finish_output(error)
	end
end

def close(error = nil)

This method should only be called from within the context of the output task.

Implementation

def close(error = nil)
	close_write(error)
	stop(error)
end

def stop(error)

This method should only be called from within the context of the HTTP/2 stream.

Implementation

def stop(error)
	if task = @task
		@task = nil
		task.stop(error)
	end
end

def passthrough(task)

Reads chunks from the given body and writes them to the stream as fast as possible.

Implementation

def passthrough(task)
	task.annotate("Writing #{@body} to #{@stream}.")
	
	while chunk = @body&.read
		self.write(chunk)
		# TODO this reduces memory usage?
		# chunk.clear unless chunk.frozen?
		# GC.start
	end
rescue => error
	raise
ensure
	# Ensure the body we are reading from is fully closed:
	if body = @body
		@body = nil
		body.close(error)
	end
	
	# Ensure the output of this body is closed:
	self.close_write(error)
end

def send_data(chunk, maximum_size)

Send maximum_size bytes of data using the specified stream. If the buffer has no more chunks, END_STREAM will be sent on the final chunk.

Implementation

def send_data(chunk, maximum_size)
	if chunk.bytesize <= maximum_size
		@stream.send_data(chunk, maximum_size: maximum_size)
	else
		@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
		
		# The window was not big enough to send all the data, so we save it for next time:
		return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
	end
	
	return nil
end