class Output
Writes body data to an HTTP/2 stream, respecting flow control windows.
Definitions
def initialize(stream, body, trailer = nil)
Initialize the output handler.
Signature
-
parameter
streamStream The HTTP/2 stream to write to.
-
parameter
bodyProtocol::HTTP::Body::Readable The body to read from.
-
parameter
trailerProtocol::HTTP::Headers | Nil Optional trailing headers.
Implementation
def initialize(stream, body, trailer = nil)
@stream = stream
@body = body
@trailer = trailer
@task = nil
@guard = ::Mutex.new
@window_updated = ::ConditionVariable.new
end
def start(parent: Task.current)
Start an asynchronous task to write the body to the stream.
Implementation
def start(parent: Task.current)
raise "Task already started!" if @task
if @body.stream?
@task = parent.async(&self.method(:stream))
else
@task = parent.async(&self.method(:passthrough))
end
end
def window_updated(size)
Signal that the flow control window has been updated.
Signature
-
parameter
sizeInteger The new window size.
-
returns
Boolean Always returns
true.
Implementation
def window_updated(size)
@guard.synchronize do
@window_updated.signal
end
return true
end
def write(chunk)
Write a chunk of data to the HTTP/2 stream, respecting flow control.
Signature
-
parameter
chunkString The data to write.
Implementation
def write(chunk)
until chunk.empty?
maximum_size = @stream.available_frame_size
# We try to avoid synchronization if possible:
if maximum_size <= 0
@guard.synchronize do
maximum_size = @stream.available_frame_size
while maximum_size <= 0
@window_updated.wait(@guard)
maximum_size = @stream.available_frame_size
end
end
end
break unless chunk = send_data(chunk, maximum_size)
end
end
def close_write(error = nil)
Finish writing to the stream.
Signature
-
parameter
errorException | Nil An optional error that caused the close.
Implementation
def close_write(error = nil)
if stream = @stream
@stream = nil
stream.finish_output(error)
end
end
def close(error = nil)
This method should only be called from within the context of the output task.
Implementation
def close(error = nil)
close_write(error)
stop(error)
end
def stop(error)
This method should only be called from within the context of the HTTP/2 stream.
Implementation
def stop(error)
if task = @task
@task = nil
task.stop(error)
end
end
def passthrough(task)
Reads chunks from the given body and writes them to the stream as fast as possible.
Implementation
def passthrough(task)
task.annotate("Writing #{@body} to #{@stream}.")
while chunk = @body&.read
self.write(chunk)
# TODO this reduces memory usage?
# chunk.clear unless chunk.frozen?
# GC.start
end
rescue => error
raise
ensure
# Ensure the body we are reading from is fully closed:
if body = @body
@body = nil
body.close(error)
end
# Ensure the output of this body is closed:
self.close_write(error)
end
def send_data(chunk, maximum_size)
Send maximum_size bytes of data using the specified stream. If the buffer has no more chunks, END_STREAM will be sent on the final chunk.
Implementation
def send_data(chunk, maximum_size)
if chunk.bytesize <= maximum_size
@stream.send_data(chunk, maximum_size: maximum_size)
else
@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
# The window was not big enough to send all the data, so we save it for next time:
return chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
end
return nil
end