class Stream
An HTTP/2 stream that manages headers, input data, and output data for a single request/response exchange.
Definitions
def initialize(*)
Initialize the stream state.
Implementation
def initialize(*)
super
@headers = nil
@pool = nil
# Input buffer, reading request body, or response body (receive_data):
@length = nil
@input = nil
# Output buffer, writing request body or response body (window_updated):
@output = nil
end
def add_header(key, value, trailer: false)
Add a header to the stream, validating against HTTP/2 constraints.
Signature
-
parameter
keyString The header name.
-
parameter
valueString The header value.
Implementation
def add_header(key, value, trailer: false)
if key == CONNECTION
raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
elsif key.start_with? ":"
raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
elsif key =~ /[A-Z]/
raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!"
else
@headers.add(key, value, trailer: trailer)
end
end
def receive_trailing_headers(headers, end_stream)
Process trailing headers received after the body.
Signature
-
parameter
headersArray The trailing header key-value pairs.
-
parameter
end_streamBoolean Whether the stream ends after these headers.
Implementation
def receive_trailing_headers(headers, end_stream)
headers.each do |key, value|
add_header(key, value, trailer: true)
end
end
def process_headers(frame)
Process an incoming HEADERS frame, dispatching to initial or trailing header handling.
Signature
-
parameter
frameProtocol::HTTP2::HeadersFrame The headers frame to process.
Implementation
def process_headers(frame)
if @headers and frame.end_stream?
self.receive_trailing_headers(super, frame.end_stream?)
else
self.receive_initial_headers(super, frame.end_stream?)
end
if @input and frame.end_stream?
@input.close_write
end
rescue ::Protocol::HTTP::InvalidTrailerError => error
Console.warn(self, error)
send_reset_stream(::Protocol::HTTP2::Error::PROTOCOL_ERROR)
rescue ::Protocol::HTTP2::HeaderError => error
Console.debug(self, "Error while processing headers!", error)
send_reset_stream(error.code)
end
def wait_for_input
Signature
-
returns
Input | Nil The input body for this stream, if available.
Implementation
def wait_for_input
return @input
end
def prepare_input(length)
Prepare the input stream which will be used for incoming data frames.
Implementation
def prepare_input(length)
if @input.nil?
@input = Input.new(self, length)
else
raise ArgumentError, "Input body already prepared!"
end
end
def update_local_window(frame)
Update the local flow control window after receiving data.
Signature
-
parameter
frameProtocol::HTTP2::DataFrame The received data frame.
Implementation
def update_local_window(frame)
consume_local_window(frame)
# This is done on demand in `Input#read`:
# request_window_update
end
def process_data(frame)
Process an incoming DATA frame and write it to the input body.
Signature
-
parameter
frameProtocol::HTTP2::DataFrame The data frame to process.
-
returns
String The unpacked data.
Implementation
def process_data(frame)
data = frame.unpack
if @input
unless data.empty?
@input.write(data)
end
if frame.end_stream?
@input.close_write
end
end
return data
rescue ::Protocol::HTTP2::ProtocolError
raise
rescue # Anything else...
send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
end
def send_body(body, trailer = nil)
Set the body and begin sending it.
Implementation
def send_body(body, trailer = nil)
@output = Output.new(self, body, trailer)
@output.start
end
def finish_output(error = nil)
Called when the output terminates normally.
Implementation
def finish_output(error = nil)
return if self.closed?
trailer = @output&.trailer
@output = nil
if error
send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
else
# Write trailer?
if trailer&.any?
send_headers(trailer, ::Protocol::HTTP2::END_STREAM)
else
send_data(nil, ::Protocol::HTTP2::END_STREAM)
end
end
end
def window_updated(size)
Called when the flow control window is updated.
Signature
-
parameter
sizeInteger The new window size.
-
returns
Boolean Always returns
true.
Implementation
def window_updated(size)
super
@output&.window_updated(size)
return true
end
def closed(error)
When the stream transitions to the closed state, this method is called. There are roughly two ways this can happen:
- A frame is received which causes this stream to enter the closed state. This method will be invoked from the background reader task.
- A frame is sent which causes this stream to enter the closed state. This method will be invoked from that task. While the input stream is relatively straight forward, the output stream can trigger the second case above
Implementation
def closed(error)
super
if input = @input
@input = nil
input.close_write(error)
end
if output = @output
@output = nil
output.stop(error)
end
if pool = @pool and @connection
pool.release(@connection)
end
return self
end