Async::HTTPSourceAsyncHTTPProtocolHTTP2Stream

class Stream

An HTTP/2 stream that manages headers, input data, and output data for a single request/response exchange.

Definitions

def initialize(*)

Initialize the stream state.

Implementation

def initialize(*)
	super
	
	@headers = nil
	
	@pool = nil
	
	# Input buffer, reading request body, or response body (receive_data):
	@length = nil
	@input = nil
	
	# Output buffer, writing request body or response body (window_updated):
	@output = nil
end

def add_header(key, value, trailer: false)

Add a header to the stream, validating against HTTP/2 constraints.

Signature

parameter key String

The header name.

parameter value String

The header value.

Implementation

def add_header(key, value, trailer: false)
	if key == CONNECTION
		raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
	elsif key.start_with? ":"
		raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
	elsif key =~ /[A-Z]/
		raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!"
	else
		@headers.add(key, value, trailer: trailer)
	end
end

def receive_trailing_headers(headers, end_stream)

Process trailing headers received after the body.

Signature

parameter headers Array

The trailing header key-value pairs.

parameter end_stream Boolean

Whether the stream ends after these headers.

Implementation

def receive_trailing_headers(headers, end_stream)
	headers.each do |key, value|
		add_header(key, value, trailer: true)
	end
end

def process_headers(frame)

Process an incoming HEADERS frame, dispatching to initial or trailing header handling.

Signature

parameter frame Protocol::HTTP2::HeadersFrame

The headers frame to process.

Implementation

def process_headers(frame)
	if @headers and frame.end_stream?
		self.receive_trailing_headers(super, frame.end_stream?)
	else
		self.receive_initial_headers(super, frame.end_stream?)
	end
	
	if @input and frame.end_stream?
		@input.close_write
	end
rescue ::Protocol::HTTP::InvalidTrailerError => error
	Console.warn(self, error)
	
	send_reset_stream(::Protocol::HTTP2::Error::PROTOCOL_ERROR)
rescue ::Protocol::HTTP2::HeaderError => error
	Console.debug(self, "Error while processing headers!", error)
	
	send_reset_stream(error.code)
end

def wait_for_input

Signature

returns Input | Nil

The input body for this stream, if available.

Implementation

def wait_for_input
	return @input
end

def prepare_input(length)

Prepare the input stream which will be used for incoming data frames.

Implementation

def prepare_input(length)
	if @input.nil?
		@input = Input.new(self, length)
	else
		raise ArgumentError, "Input body already prepared!"
	end
end

def update_local_window(frame)

Update the local flow control window after receiving data.

Signature

parameter frame Protocol::HTTP2::DataFrame

The received data frame.

Implementation

def update_local_window(frame)
	consume_local_window(frame)
	
	# This is done on demand in `Input#read`:
	# request_window_update
end

def process_data(frame)

Process an incoming DATA frame and write it to the input body.

Signature

parameter frame Protocol::HTTP2::DataFrame

The data frame to process.

returns String

The unpacked data.

Implementation

def process_data(frame)
	data = frame.unpack
	
	if @input
		unless data.empty?
			@input.write(data)
		end
		
		if frame.end_stream?
			@input.close_write
		end
	end
	
	return data
rescue ::Protocol::HTTP2::ProtocolError
	raise
rescue # Anything else...
	send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end

def send_body(body, trailer = nil)

Set the body and begin sending it.

Implementation

def send_body(body, trailer = nil)
	@output = Output.new(self, body, trailer)
	
	@output.start
end

def finish_output(error = nil)

Called when the output terminates normally.

Implementation

def finish_output(error = nil)
	return if self.closed?
	
	trailer = @output&.trailer
	
	@output = nil
	
	if error
		send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
	else
		# Write trailer?
		if trailer&.any?
			send_headers(trailer, ::Protocol::HTTP2::END_STREAM)
		else
			send_data(nil, ::Protocol::HTTP2::END_STREAM)
		end
	end
end

def window_updated(size)

Called when the flow control window is updated.

Signature

parameter size Integer

The new window size.

returns Boolean

Always returns true.

Implementation

def window_updated(size)
	super
	
	@output&.window_updated(size)
	
	return true
end

def closed(error)

When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:

  • A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
  • A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task. While the input stream is relatively straight forward, the output stream can trigger the second case above

Implementation

def closed(error)
	super
	
	if input = @input
		@input = nil
		input.close_write(error)
	end
	
	if output = @output
		@output = nil
		output.stop(error)
	end
	
	if pool = @pool and @connection
		pool.release(@connection)
	end
	
	return self
end