class Connection
This is the core connection class that handles HTTP/2 protocol semantics including stream management, settings negotiation, and frame processing.
Definitions
def initialize(framer, local_stream_id)
Initialize a new HTTP/2 connection.
Signature
-
parameter
framer
Framer
The frame handler for reading/writing HTTP/2 frames.
-
parameter
local_stream_id
Integer
The starting stream ID for locally-initiated streams.
Implementation
def initialize(framer, local_stream_id)
super()
@state = :new
# Hash(Integer, Stream)
@streams = {}
@framer = framer
# The next stream id to use:
@local_stream_id = local_stream_id
# The biggest remote stream id seen thus far:
@remote_stream_id = 0
@local_settings = PendingSettings.new
@remote_settings = Settings.new
@decoder = HPACK::Context.new
@encoder = HPACK::Context.new
@local_window = LocalWindow.new
@remote_window = Window.new
end
def id
The connection stream ID (always 0 for connection-level operations).
Signature
-
returns
Integer
Always returns 0 for the connection itself.
Implementation
def id
0
end
def [](id)
Access streams by ID, with 0 returning the connection itself.
Signature
-
parameter
id
Integer
The stream ID to look up.
-
returns
Connection | Stream | Nil
The connection (if id=0), stream, or nil.
Implementation
def [] id
if id.zero?
self
else
@streams[id]
end
end
def maximum_frame_size
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
Implementation
def maximum_frame_size
@remote_settings.maximum_frame_size
end
def maximum_concurrent_streams
The maximum number of concurrent streams that this connection can initiate. This is a setting that can be changed by the remote peer.
It is not the same as the number of streams that can be accepted by the connection. The number of streams that can be accepted is determined by the local settings, and the number of streams that can be initiated is determined by the remote settings.
Implementation
def maximum_concurrent_streams
@remote_settings.maximum_concurrent_streams
end
attr_accessor :state
Connection state (:new, :open, :closed).
attr_accessor :local_settings
Current settings value for local and peer
attr :local_window
Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.
attr :remote_window
Our window for sending data. When we send data, it reduces this window.
attr :remote_stream_id
The highest stream_id that has been successfully accepted by this connection.
def closed?
Whether the connection is effectively or actually closed.
Implementation
def closed?
@state == :closed || @framer.nil?
end
def delete(id)
Remove a stream from the active streams collection.
Signature
-
parameter
id
Integer
The stream ID to remove.
-
returns
Stream | Nil
The removed stream, or nil if not found.
Implementation
def delete(id)
@streams.delete(id)
end
def close(error = nil)
Close the underlying framer and all streams.
Implementation
def close(error = nil)
# The underlying socket may already be closed by this point.
@streams.each_value{|stream| stream.close(error)}
@streams.clear
ensure
if @framer
@framer.close
@framer = nil
end
end
def encode_headers(headers, buffer = String.new.b)
Encode headers using HPACK compression.
Signature
-
parameter
headers
Array
The headers to encode.
-
parameter
buffer
String
Optional buffer for encoding output.
-
returns
String
The encoded header block.
Implementation
def encode_headers(headers, buffer = String.new.b)
HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers)
end
def decode_headers(data)
Decode headers using HPACK decompression.
Signature
-
parameter
data
String
The encoded header block data.
-
returns
Array
The decoded headers.
Implementation
def decode_headers(data)
HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode
end
def next_stream_id
Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
Implementation
def next_stream_id
id = @local_stream_id
@local_stream_id += 2
return id
end
def ignore_frame?(frame)
6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
Implementation
def ignore_frame?(frame)
if self.closed?
# puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}"
if valid_remote_stream_id?(frame.stream_id)
return frame.stream_id > @remote_stream_id
end
end
end
def synchronize
Execute a block within a synchronized context. This method provides a synchronization primitive for thread safety.
Implementation
def synchronize
yield
end
def read_frame
Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
Implementation
def read_frame
frame = @framer.read_frame(@local_settings.maximum_frame_size)
# puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})"
# puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
return if ignore_frame?(frame)
yield frame if block_given?
frame.apply(self)
return frame
rescue GoawayError => error
# Go directly to jail. Do not pass go, do not collect $200.
raise
rescue ProtocolError => error
send_goaway(error.code || PROTOCOL_ERROR, error.message)
raise
rescue HPACK::Error => error
send_goaway(COMPRESSION_ERROR, error.message)
raise
end
def send_settings(changes)
Send updated settings to the remote peer.
Signature
-
parameter
changes
Hash
The settings changes to send.
Implementation
def send_settings(changes)
@local_settings.append(changes)
frame = SettingsFrame.new
frame.pack(changes)
write_frame(frame)
end
def close!
Transition the connection into the closed state.
Implementation
def close!
@state = :closed
return self
end
def send_goaway(error_code = 0, message = "")
Tell the remote end that the connection is being shut down. If the error_code
is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
Implementation
def send_goaway(error_code = 0, message = "")
frame = GoawayFrame.new
frame.pack @remote_stream_id, error_code, message
write_frame(frame)
ensure
self.close!
end
def receive_goaway(frame)
Process a GOAWAY frame from the remote peer.
Signature
-
parameter
frame
GoawayFrame
The GOAWAY frame to process.
-
raises
GoawayError
If the frame indicates a connection error.
Implementation
def receive_goaway(frame)
# We capture the last stream that was processed.
@remote_stream_id, error_code, message = frame.unpack
self.close!
if error_code != 0
# Shut down immediately.
raise GoawayError.new(message, error_code)
end
end
def write_frame(frame)
Write a single frame to the connection.
Signature
-
parameter
frame
Frame
The frame to write.
Implementation
def write_frame(frame)
synchronize do
@framer.write_frame(frame)
end
@framer.flush
end
def write_frames
Write multiple frames within a synchronized block.
Signature
-
yields
{|framer| ...}
The framer for writing multiple frames.
-
parameter
framer
Framer
The framer instance.
-
parameter
-
raises
EOFError
If the connection is closed.
Implementation
def write_frames
if @framer
synchronize do
yield @framer
end
@framer.flush
else
raise EOFError, "Connection closed!"
end
end
def update_local_settings(changes)
Update local settings and adjust stream window capacities.
Signature
-
parameter
changes
Hash
The settings changes to apply locally.
Implementation
def update_local_settings(changes)
capacity = @local_settings.initial_window_size
@streams.each_value do |stream|
stream.local_window.capacity = capacity
end
@local_window.desired = capacity
end
def update_remote_settings(changes)
Update remote settings and adjust stream window capacities.
Signature
-
parameter
changes
Hash
The settings changes to apply to remote peer.
Implementation
def update_remote_settings(changes)
capacity = @remote_settings.initial_window_size
@streams.each_value do |stream|
stream.remote_window.capacity = capacity
end
end
def process_settings(frame)
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the "open" or "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
Implementation
def process_settings(frame)
if frame.acknowledgement?
# The remote end has confirmed the settings have been received:
changes = @local_settings.acknowledge
update_local_settings(changes)
return true
else
# The remote end is updating the settings, we reply with acknowledgement:
reply = frame.acknowledge
write_frame(reply)
changes = frame.unpack
@remote_settings.update(changes)
update_remote_settings(changes)
return false
end
end
def open!
Transition the connection to the open state.
Signature
-
returns
Connection
Self for method chaining.
Implementation
def open!
@state = :open
return self
end
def receive_settings(frame)
Receive and process a SETTINGS frame from the remote peer.
Signature
-
parameter
frame
SettingsFrame
The settings frame to process.
-
raises
ProtocolError
If the connection is in an invalid state.
Implementation
def receive_settings(frame)
if @state == :new
# We transition to :open when we receive acknowledgement of first settings frame:
open! if process_settings(frame)
elsif @state != :closed
process_settings(frame)
else
raise ProtocolError, "Cannot receive settings in state #{@state}"
end
end
def send_ping(data)
Send a PING frame to the remote peer.
Signature
-
parameter
data
String
The 8-byte ping payload data.
Implementation
def send_ping(data)
if @state != :closed
frame = PingFrame.new
frame.pack data
write_frame(frame)
else
raise ProtocolError, "Cannot send ping in state #{@state}"
end
end
def receive_ping(frame)
Process a PING frame from the remote peer.
Signature
-
parameter
frame
PingFrame
The ping frame to process.
-
raises
ProtocolError
If ping is received in invalid state.
Implementation
def receive_ping(frame)
if @state != :closed
# This is handled in `read_payload`:
# if frame.stream_id != 0
# raise ProtocolError, "Ping received for non-zero stream!"
# end
unless frame.acknowledgement?
reply = frame.acknowledge
write_frame(reply)
end
else
raise ProtocolError, "Cannot receive ping in state #{@state}"
end
end
def receive_data(frame)
Process a DATA frame from the remote peer.
Signature
-
parameter
frame
DataFrame
The data frame to process.
-
raises
ProtocolError
If data is received for invalid stream.
Implementation
def receive_data(frame)
update_local_window(frame)
if stream = @streams[frame.stream_id]
stream.receive_data(frame)
elsif closed_stream_id?(frame.stream_id)
# This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless.
else
raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}"
end
end
def valid_remote_stream_id?(stream_id)
Check if the given stream ID is valid for remote initiation. This method should be overridden by client/server implementations.
Signature
-
parameter
stream_id
Integer
The stream ID to validate.
-
returns
Boolean
True if the stream ID is valid for remote initiation.
Implementation
def valid_remote_stream_id?(stream_id)
false
end
def accept_stream(stream_id, &block)
Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.
Implementation
def accept_stream(stream_id, &block)
unless valid_remote_stream_id?(stream_id)
raise ProtocolError, "Invalid stream id: #{stream_id}"
end
create_stream(stream_id, &block)
end
def accept_push_promise_stream(stream_id, &block)
Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.
Implementation
def accept_push_promise_stream(stream_id, &block)
accept_stream(stream_id, &block)
end
def create_stream(id = next_stream_id, &block)
Create a stream, defaults to an outgoing stream. On the client side, we create requests.
Implementation
def create_stream(id = next_stream_id, &block)
if @streams.key?(id)
raise ProtocolError, "Cannot create stream with id #{id}, already exists!"
end
if block_given?
return yield(self, id)
else
return Stream.create(self, id)
end
end
def create_push_promise_stream(&block)
Create a push promise stream. This method should be overridden by client/server implementations.
Signature
-
yields
{|stream| ...}
Optional block to configure the created stream.
-
returns
Stream
The created push promise stream.
Implementation
def create_push_promise_stream(&block)
create_stream(&block)
end
def receive_headers(frame)
On the server side, starts a new request.
Implementation
def receive_headers(frame)
stream_id = frame.stream_id
if stream_id.zero?
raise ProtocolError, "Cannot receive headers for stream 0!"
end
if stream = @streams[stream_id]
stream.receive_headers(frame)
else
if stream_id <= @remote_stream_id
raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!"
end
# We need to validate that we have less streams than the specified maximum:
if @streams.size < @local_settings.maximum_concurrent_streams
stream = accept_stream(stream_id)
@remote_stream_id = stream_id
stream.receive_headers(frame)
else
raise ProtocolError, "Exceeded maximum concurrent streams"
end
end
end
def receive_push_promise(frame)
Receive and process a PUSH_PROMISE frame.
Signature
-
parameter
frame
PushPromiseFrame
The push promise frame.
-
raises
ProtocolError
Always raises as push promises are not supported.
Implementation
def receive_push_promise(frame)
raise ProtocolError, "Unable to receive push promise!"
end
def receive_priority_update(frame)
Receive and process a PRIORITY_UPDATE frame.
Signature
-
parameter
frame
PriorityUpdateFrame
The priority update frame.
-
raises
ProtocolError
If the stream ID is invalid.
Implementation
def receive_priority_update(frame)
if frame.stream_id != 0
raise ProtocolError, "Invalid stream id: #{frame.stream_id}"
end
stream_id, value = frame.unpack
# Apparently you can set the priority of idle streams, but I'm not sure why that makes sense, so for now let's ignore it.
if stream = @streams[stream_id]
stream.priority = Protocol::HTTP::Header::Priority.new(value)
end
end
def client_stream_id?(id)
Check if the given stream ID represents a client-initiated stream. Client streams always have odd numbered IDs.
Signature
-
parameter
id
Integer
The stream ID to check.
-
returns
Boolean
True if the stream ID is client-initiated.
Implementation
def client_stream_id?(id)
id.odd?
end
def server_stream_id?(id)
Check if the given stream ID represents a server-initiated stream. Server streams always have even numbered IDs.
Signature
-
parameter
id
Integer
The stream ID to check.
-
returns
Boolean
True if the stream ID is server-initiated.
Implementation
def server_stream_id?(id)
id.even?
end
def idle_stream_id?(id)
Check if the given stream ID represents an idle stream.
Signature
-
parameter
id
Integer
The stream ID to check.
-
returns
Boolean
True if the stream ID is idle (not yet used).
Implementation
def idle_stream_id?(id)
if id.even?
# Server-initiated streams are even.
if @local_stream_id.even?
id >= @local_stream_id
else
id > @remote_stream_id
end
elsif id.odd?
# Client-initiated streams are odd.
if @local_stream_id.odd?
id >= @local_stream_id
else
id > @remote_stream_id
end
end
end
def closed_stream_id?(id)
This is only valid if the stream doesn't exist in @streams
.
Implementation
def closed_stream_id?(id)
if id.zero?
# The connection "stream id" can never be closed:
false
else
!idle_stream_id?(id)
end
end
def receive_reset_stream(frame)
Receive and process a RST_STREAM frame.
Signature
-
parameter
frame
ResetStreamFrame
The reset stream frame.
-
raises
ProtocolError
If the frame is invalid for connection context.
Implementation
def receive_reset_stream(frame)
if frame.connection?
raise ProtocolError, "Cannot reset connection!"
elsif stream = @streams[frame.stream_id]
stream.receive_reset_stream(frame)
elsif closed_stream_id?(frame.stream_id)
# Ignore.
else
raise StreamClosed, "Cannot reset stream #{frame.stream_id}"
end
end
def consume_window(size = self.available_size)
Traverse active streams and allow them to consume the available flow-control window.
Signature
-
parameter
amount
Integer
the amount of data to write. Defaults to the current window capacity.
Implementation
def consume_window(size = self.available_size)
# Return if there is no window to consume:
return unless size > 0
@streams.each_value do |stream|
if stream.active?
stream.window_updated(size)
end
end
end
def receive_window_update(frame)
Receive and process a WINDOW_UPDATE frame.
Signature
-
parameter
frame
WindowUpdateFrame
The window update frame.
Implementation
def receive_window_update(frame)
if frame.connection?
super
self.consume_window
elsif stream = @streams[frame.stream_id]
begin
stream.receive_window_update(frame)
rescue ProtocolError => error
stream.send_reset_stream(error.code)
end
elsif closed_stream_id?(frame.stream_id)
# Ignore.
else
# Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR.
raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}"
end
end
def receive_continuation(frame)
Receive and process a CONTINUATION frame.
Signature
-
parameter
frame
ContinuationFrame
The continuation frame.
-
raises
ProtocolError
Always raises as unexpected continuation frames are not supported.
Implementation
def receive_continuation(frame)
raise ProtocolError, "Received unexpected continuation: #{frame.class}"
end
def receive_frame(frame)
Receive and process a generic frame (default handler).
Signature
-
parameter
frame
Frame
The frame to receive.
Implementation
def receive_frame(frame)
# ignore.
end