Streaming
This guide gives an overview of how to implement streaming requests and responses.
Independent Uni-directional Streaming
The request and response body work independently of each other can stream data in both directions. class Protocol::HTTP::Body::Stream
provides an interface to merge these independent streams into an IO-like interface.
#!/usr/bin/env ruby
require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'
require 'protocol/http/body/stream'
require 'protocol/http/body/writable'
endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')
Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)
Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
Protocol::HTTP::Response[200, {}, output]
end
server_task = Async{server.run}
client = Async::HTTP::Client.new(endpoint)
input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)
begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)
stream.write("Hello, ")
stream.write("World!")
stream.close_write
while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
This approach works quite well, especially when the input and output bodies are independently compressed, decompressed, or chunked. However, some protocols, notably, WebSockets operate on the raw connection and don't require this level of abstraction.
Bi-directional Streaming
While WebSockets can work on the above streaming interface, it's a bit more convenient to use the streaming interface directly, which gives raw access to the underlying stream where possible.
#!/usr/bin/env ruby
require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'
require 'protocol/http/body/stream'
require 'protocol/http/body/writable'
endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')
Async do
server = Async::HTTP::Server.for(endpoint) do |request|
streamable = Protocol::HTTP::Body::Streamable.
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)
Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
Protocol::HTTP::Response[200, {}, output]
end
server_task = Async{server.run}
client = Async::HTTP::Client.new(endpoint)
input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)
begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)
stream.write("Hello, ")
stream.write("World!")
stream.close_write
while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end