class Stream
Represents the HTTP/2 stream associated with an outgoing client-side response.
Definitions
def initialize(*)
Initialize the response stream.
Implementation
def initialize(*)
super
@response = Response.new(self)
@notification = Async::Notification.new
@exception = nil
end
def wait_for_input
Wait for the response headers and return the response body.
Signature
-
returns
Protocol::HTTP::Body::Readable | Nil The response body.
Implementation
def wait_for_input
# The input isn't ready until the response headers have been received:
@response.wait
# There is a possible race condition if you try to access @input - it might already be closed and nil.
return @response.body
end
def accept_push_promise_stream(promised_stream_id, headers)
Handle a push promise stream from the server.
Signature
-
parameter
promised_stream_idInteger The stream ID for the promised resource.
-
parameter
headersArray The promise headers.
Implementation
def accept_push_promise_stream(promised_stream_id, headers)
raise ProtocolError, "Cannot accept push promise stream!"
end
def receive_initial_headers(headers, end_stream)
This should be invoked from the background reader, and notifies the task waiting for the headers that we are done.
Implementation
def receive_initial_headers(headers, end_stream)
# While in theory, the response pseudo-headers may be extended in the future, currently they only response pseudo-header is :status, so we can assume it is always the first header.
status_header = headers.shift
if status_header.first != ":status"
raise ProtocolError, "Invalid response headers: #{headers.inspect}"
end
status = Integer(status_header.last)
if status >= 100 && status < 200
return receive_interim_headers(status, headers)
end
@response.status = status
@headers = ::Protocol::HTTP::Headers.new
# If the protocol request was successful, ensure the response protocol matches:
if status == 200 and protocol = @response.request.protocol
@response.protocol = Array(protocol).first
end
headers.each do |key, value|
# It's guaranteed that this should be the first header:
if key == CONTENT_LENGTH
@length = Integer(value)
else
add_header(key, value)
end
end
@response.headers = @headers
if @response.valid?
if !end_stream
# We only construct the input/body if data is coming.
@response.body = prepare_input(@length)
elsif @response.head?
@response.body = ::Protocol::HTTP::Body::Head.new(@length)
end
else
send_reset_stream(::Protocol::HTTP2::Error::PROTOCOL_ERROR)
end
self.notify!
return headers
end
def receive_interim_headers(status, headers)
Process interim (1xx) response headers.
Signature
-
parameter
statusInteger The interim status code.
-
parameter
headersArray The interim response headers.
Implementation
def receive_interim_headers(status, headers)
if headers.any?
headers = ::Protocol::HTTP::Headers[headers]
else
headers = nil
end
@response.request.send_interim_response(status, headers)
end
def notify!
Notify anyone waiting on the response headers to be received (or failure).
Implementation
def notify!
if notification = @notification
@notification = nil
notification.signal
end
end
def wait
Wait for the headers to be received or for stream reset.
Implementation
def wait
# If you call wait after the headers were already received, it should return immediately:
@notification&.wait
if @exception
raise @exception
end
end
def closed(error)
Called when the stream is closed.
Signature
-
parameter
errorException | Nil The error that caused the close, if any.
Implementation
def closed(error)
super
if @response
@response = nil
end
@exception = error
self.notify!
end