class Stream
A single HTTP/2 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.
This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with ":" prefix in diagram below) and provide your application logic to handle request and response processing.
┌────────┐
send PP │ │ recv PP
┌──────────┤ idle ├──────────┐
│ │ │ │
▼ └───┬────┘ ▼
┌──────────┐ │ ┌──────────┐
│ │ │ send H / │ │
┌──────┼ reserved │ │ recv H │ reserved ├──────┐
│ │ (local) │ │ │ (remote) │ │
│ └───┬──────┘ ▼ └──────┬───┘ │
│ │ ┌────────┐ │ │
│ │ recv ES │ │ send ES │ │
│ send H │ ┌─────────┤ open ├─────────┐ │ recv H │
│ │ │ │ │ │ │ │
│ ▼ ▼ └───┬────┘ ▼ ▼ │
│ ┌──────────┐ │ ┌──────────┐ │
│ │ half │ │ │ half │ │
│ │ closed │ │ send R / │ closed │ │
│ │ (remote) │ │ recv R │ (local) │ │
│ └────┬─────┘ │ └─────┬────┘ │
│ │ │ │ │
│ │ send ES / │ recv ES / │ │
│ │ send R / ▼ send R / │ │
│ │ recv R ┌────────┐ recv R │ │
│ send R / └───────────►│ │◄───────────┘ send R / │
│ recv R │ closed │ recv R │
└───────────────────────►│ │◄───────────────────────┘
└────────┘
-
send
: endpoint sends this frame -
recv
: endpoint receives this frame -
H: HEADERS frame (with implied CONTINUATIONs)
-
PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
-
ES: END_STREAM flag
-
R: RST_STREAM frame
State transition methods use a trailing "!".
Definitions
attr :connection
The connection this stream belongs to.
attr :id
Stream ID (odd for client initiated streams, even otherwise).
attr_accessor :state
Stream state, e.g. idle
, closed
.
attr_accessor :priority
Signature
-
attribute
Protocol::HTTP::Header::Priority | Nil
the priority of the stream.
def close(error = nil)
Transition directly to closed state. Do not pass go, do not collect $200.
This method should only be used by Connection#close
.
Implementation
def close(error = nil)
unless closed?
@state = :closed
self.closed(error)
end
end
def send_headers?
HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state. Despite it's name, it can also be used for trailers.
Implementation
def send_headers?
@state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote
end
def send_headers(*arguments)
The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the "idle", "reserved (local)", "open", or "half-closed (remote)" state.
Implementation
def send_headers(*arguments)
if @state == :idle
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
else
open!
end
elsif @state == :reserved_local
frame = write_headers(*arguments)
@state = :half_closed_remote
elsif @state == :open
frame = write_headers(*arguments)
if frame.end_stream?
@state = :half_closed_local
end
elsif @state == :half_closed_remote
frame = write_headers(*arguments)
if frame.end_stream?
close!
end
else
raise ProtocolError, "Cannot send headers in state: #{@state}"
end
end
def closed(error = nil)
The stream has been closed. If closed due to a stream reset, the error will be set.
Implementation
def closed(error = nil)
end
def close!(error_code = nil)
Transition the stream into the closed state.
Signature
-
parameter
error_code
Integer
the error code if the stream was closed due to a stream reset.
Implementation
def close!(error_code = nil)
@state = :closed
@connection.delete(@id)
if error_code
error = StreamError.new("Stream closed!", error_code)
end
self.closed(error)
return self
end
def process_data(frame)
Implementation
def process_data(frame)
frame.unpack
end
def receive_data(frame)
DATA frames are subject to flow control and can only be sent when a stream is in the "open" or "half-closed (remote)" state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error of type STREAM_CLOSED.
Implementation
def receive_data(frame)
if @state == :open
update_local_window(frame)
if frame.end_stream?
@state = :half_closed_remote
end
process_data(frame)
elsif @state == :half_closed_local
update_local_window(frame)
process_data(frame)
if frame.end_stream?
close!
end
elsif self.closed?
ignore_data(frame)
else
# If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
self.send_reset_stream(Error::STREAM_CLOSED)
end
end
def create_push_promise_stream(headers)
Override this function to implement your own push promise logic.
Implementation
def create_push_promise_stream(headers)
@connection.create_push_promise_stream
end
def send_push_promise(headers)
Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame.
Signature
-
parameter
headers
Hash
contains a complete set of request header fields that the server attributes to the request.
Implementation
def send_push_promise(headers)
if @state == :open or @state == :half_closed_remote
promised_stream = self.create_push_promise_stream(headers)
promised_stream.reserved_local!
# The headers are the same as if the client had sent a request:
write_push_promise(promised_stream.id, headers)
# The server should call send_headers on the promised stream to begin sending the response:
return promised_stream
else
raise ProtocolError, "Cannot send push promise in state: #{@state}"
end
end
def accept_push_promise_stream(stream_id, headers)
Override this function to implement your own push promise logic.
Implementation
def accept_push_promise_stream(stream_id, headers)
@connection.accept_push_promise_stream(stream_id)
end