class Stream
A single HTTP/2 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with ":" prefix in diagram below) and provide your application logic to handle request and response processing.
┌────────┐
send PP │ │ recv PP
┌──────────┤ idle ├──────────┐
│ │ │ │
▼ └───┬────┘ ▼
┌──────────┐ │ ┌──────────┐
│ │ │ send H / │ │
┌──────┼ reserved │ │ recv H │ reserved ├──────┐
│ │ (local) │ │ │ (remote) │ │
│ └───┬──────┘ ▼ └──────┬───┘ │
│ │ ┌────────┐ │ │
│ │ recv ES │ │ send ES │ │
│ send H │ ┌─────────┤ open ├─────────┐ │ recv H │
│ │ │ │ │ │ │ │
│ ▼ ▼ └───┬────┘ ▼ ▼ │
│ ┌──────────┐ │ ┌──────────┐ │
│ │ half │ │ │ half │ │
│ │ closed │ │ send R / │ closed │ │
│ │ (remote) │ │ recv R │ (local) │ │
│ └────┬─────┘ │ └─────┬────┘ │
│ │ │ │ │
│ │ send ES / │ recv ES / │ │
│ │ send R / ▼ send R / │ │
│ │ recv R ┌────────┐ recv R │ │
│ send R / └───────────►│ │◄───────────┘ send R / │
│ recv R │ closed │ recv R │
└───────────────────────►│ │◄───────────────────────┘
└────────┘
-
send
: endpoint sends this frame -
recv
: endpoint receives this frame -
H: HEADERS frame (with implied CONTINUATIONs)
-
PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
-
ES: END_STREAM flag
-
R: RST_STREAM frame
State transition methods use a trailing "!".
Definitions
def self.create(connection, id)
Create a new stream and add it to the connection.
Signature
-
parameter
connection
Connection
The connection this stream belongs to.
-
parameter
id
Integer
The stream identifier.
-
returns
Stream
The newly created stream.
Implementation
def self.create(connection, id)
stream = self.new(connection, id)
connection.streams[id] = stream
return stream
end
def initialize(connection, id, state = :idle)
Initialize a new stream.
Signature
-
parameter
connection
Connection
The connection this stream belongs to.
-
parameter
id
Integer
The stream identifier.
-
parameter
state
Symbol
The initial stream state.
Implementation
def initialize(connection, id, state = :idle)
@connection = connection
@id = id
@state = state
@local_window = Window.new(@connection.local_settings.initial_window_size)
@remote_window = Window.new(@connection.remote_settings.initial_window_size)
@priority = nil
end
attr :connection
The connection this stream belongs to.
attr :id
Stream ID (odd for client initiated streams, even otherwise).
attr_accessor :state
Stream state, e.g. idle
, closed
.
attr_accessor :priority
Signature
-
attribute
Protocol::HTTP::Header::Priority | Nil
the priority of the stream.
def maximum_frame_size
Get the maximum frame size for this stream.
Signature
-
returns
Integer
The maximum frame size from connection settings.
Implementation
def maximum_frame_size
@connection.available_frame_size
end
def write_frame(frame)
Write a frame to the connection for this stream.
Signature
-
parameter
frame
Frame
The frame to write.
Implementation
def write_frame(frame)
@connection.write_frame(frame)
end
def active?
Check if the stream is active (not idle or closed).
Signature
-
returns
Boolean
True if the stream is active.
Implementation
def active?
@state != :closed && @state != :idle
end
def closed?
Check if the stream is closed.
Signature
-
returns
Boolean
True if the stream is in closed state.
Implementation
def closed?
@state == :closed
end
def close(error = nil)
Transition directly to closed state. Do not pass go, do not collect $200.
This method should only be used by Connection#close
.
Implementation
def close(error = nil)
unless closed?
@state = :closed
self.closed(error)
end
end
def send_headers?
HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state. Despite it's name, it can also be used for trailers.
Implementation
def send_headers?
@state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote
end
def send_headers(*arguments)
The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state.
Implementation
def send_headers(*arguments)
if @state == :idle
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
else
open!
end
elsif @state == :reserved_local
frame = write_headers(*arguments)
@state = :half_closed_remote
elsif @state == :open
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
end
elsif @state == :half_closed_remote
frame = write_headers(*arguments)
if frame.end_stream?
close!
end
else
raise ProtocolError, "Cannot send headers in state: #{@state}"
end
end
def consume_remote_window(frame)
Consume from the remote window for both stream and connection.
Signature
-
parameter
frame
Frame
The frame that consumes window space.
Implementation
def consume_remote_window(frame)
super
@connection.consume_remote_window(frame)
end
def send_data(*arguments, **options)
Send data over this stream.
Signature
-
parameter
arguments
Array
Arguments passed to write_data.
-
parameter
options
Hash
Options passed to write_data.
Implementation
def send_data(*arguments, **options)
if @state == :open
frame = write_data(*arguments, **options)
if frame.end_stream?
@state = :half_closed_local
end
elsif @state == :half_closed_remote
frame = write_data(*arguments, **options)
if frame.end_stream?
close!
end
else
raise ProtocolError, "Cannot send data in state: #{@state}"
end
end
def open!
Open the stream by transitioning from idle to open state.
Signature
-
returns
Stream
Returns self for chaining.
-
raises
ProtocolError
If the stream cannot be opened from current state.
Implementation
def open!
if @state == :idle
@state = :open
else
raise ProtocolError, "Cannot open stream in state: #{@state}"
end
return self
end
def closed(error = nil)
The stream has been closed. If closed due to a stream reset, the error will be set.
Implementation
def closed(error = nil)
end
def close!(error_code = nil)
Transition the stream into the closed state.
Signature
-
parameter
error_code
Integer
the error code if the stream was closed due to a stream reset.
Implementation
def close!(error_code = nil)
@state = :closed
@connection.delete(@id)
if error_code
error = StreamError.new("Stream closed!", error_code)
end
self.closed(error)
return self
end
def send_reset_stream(error_code = 0)
Send a RST_STREAM frame to reset this stream.
Signature
-
parameter
error_code
Integer
The error code to send.
Implementation
def send_reset_stream(error_code = 0)
if @state != :idle and @state != :closed
frame = ResetStreamFrame.new(@id)
frame.pack(error_code)
write_frame(frame)
close!
else
raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}"
end
end
def process_headers(frame)
Process headers frame and decode the header block.
Signature
-
parameter
frame
HeadersFrame
The headers frame to process.
-
returns
Array
The decoded headers.
Implementation
def process_headers(frame)
# Receiving request headers:
data = frame.unpack
@connection.decode_headers(data)
end
def receive_headers(frame)
Receive and process a headers frame on this stream.
Signature
-
parameter
frame
HeadersFrame
The headers frame to receive.
Implementation
def receive_headers(frame)
if @state == :idle
if frame.end_stream?
@state = :half_closed_remote
else
open!
end
process_headers(frame)
elsif @state == :reserved_remote
process_headers(frame)
@state = :half_closed_local
elsif @state == :open
process_headers(frame)
if frame.end_stream?
@state = :half_closed_remote
end
elsif @state == :half_closed_local
process_headers(frame)
if frame.end_stream?
close!
end
elsif self.closed?
ignore_headers(frame)
else
self.send_reset_stream(Error::STREAM_CLOSED)
end
end
def process_data(frame)
Implementation
def process_data(frame)
frame.unpack
end
def ignore_data(frame)
Ignore data frame when in an invalid state.
Signature
-
parameter
frame
DataFrame
The data frame to ignore.
Implementation
def ignore_data(frame)
# Console.warn(self) {"Received headers in state: #{@state}!"}
end
def receive_data(frame)
DATA frames are subject to flow control and can only be sent when a stream is in the "open" or "half-closed (remote)" state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
Implementation
def receive_data(frame)
if @state == :open
update_local_window(frame)
if frame.end_stream?
@state = :half_closed_remote
end
process_data(frame)
elsif @state == :half_closed_local
update_local_window(frame)
process_data(frame)
if frame.end_stream?
close!
end
elsif self.closed?
ignore_data(frame)
else
# If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
self.send_reset_stream(Error::STREAM_CLOSED)
end
end
def receive_reset_stream(frame)
Receive and process a RST_STREAM frame on this stream.
Signature
-
parameter
frame
ResetStreamFrame
The reset stream frame to receive.
-
returns
Integer
The error code from the reset frame.
-
raises
ProtocolError
If reset is received on an idle stream.
Implementation
def receive_reset_stream(frame)
if @state == :idle
# If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
raise ProtocolError, "Cannot receive reset stream in state: #{@state}!"
else
error_code = frame.unpack
close!(error_code)
return error_code
end
end
def reserved_local!
Transition stream to reserved local state.
Signature
-
returns
Stream
Returns self for chaining.
-
raises
ProtocolError
If the stream cannot be reserved from current state.
Implementation
def reserved_local!
if @state == :idle
@state = :reserved_local
else
raise ProtocolError, "Cannot reserve stream in state: #{@state}"
end
return self
end
def reserved_remote!
Transition stream to reserved remote state.
Signature
-
returns
Stream
Returns self for chaining.
-
raises
ProtocolError
If the stream cannot be reserved from current state.
Implementation
def reserved_remote!
if @state == :idle
@state = :reserved_remote
else
raise ProtocolError, "Cannot reserve stream in state: #{@state}"
end
return self
end
def create_push_promise_stream(headers)
Override this function to implement your own push promise logic.
Implementation
def create_push_promise_stream(headers)
@connection.create_push_promise_stream
end
def send_push_promise(headers)
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
Signature
-
parameter
headers
Hash
contains a complete set of request header fields that the server attributes to the request.
Implementation
def send_push_promise(headers)
if @state == :open or @state == :half_closed_remote
promised_stream = self.create_push_promise_stream(headers)
promised_stream.reserved_local!
# The headers are the same as if the client had sent a request:
write_push_promise(promised_stream.id, headers)
# The server should call send_headers on the promised stream to begin sending the response:
return promised_stream
else
raise ProtocolError, "Cannot send push promise in state: #{@state}"
end
end
def accept_push_promise_stream(stream_id, headers)
Override this function to implement your own push promise logic.
Implementation
def accept_push_promise_stream(stream_id, headers)
@connection.accept_push_promise_stream(stream_id)
end
def receive_push_promise(frame)
Receive and process a PUSH_PROMISE frame on this stream.
Signature
-
parameter
frame
PushPromiseFrame
The push promise frame to receive.
Implementation
def receive_push_promise(frame)
promised_stream_id, data = frame.unpack
headers = @connection.decode_headers(data)
stream = self.accept_push_promise_stream(promised_stream_id, headers)
stream.reserved_remote!
return stream, headers
end
def inspect
Get a string representation of the stream.
Signature
-
returns
String
Human-readable stream information.
Implementation
def inspect
"\#<#{self.class} id=#{@id} state=#{@state}>"
end