module Connection
Provides shared connection behaviour for HTTP/2 client and server connections.
Definitions
def initialize(...)
Initialize the connection state.
Implementation
def initialize(...)
super
@reader = nil
# Writing multiple frames at the same time can cause odd problems if frames are only partially written. So we use a semaphore to ensure frames are written in their entirety.
@write_frame_guard = Async::Semaphore.new(1)
end
def synchronize(&block)
Synchronize write access to the connection.
Signature
-
yields
{|...| ...} The block to execute while holding the write lock.
Implementation
def synchronize(&block)
@write_frame_guard.acquire(&block)
end
def to_s
Signature
-
returns
String A string representation of this connection.
Implementation
def to_s
"\#<#{self.class} #{@streams.count} active streams>"
end
def as_json(...)
Signature
-
returns
String A JSON-compatible representation.
Implementation
def as_json(...)
to_s
end
def to_json(...)
Signature
-
returns
String A JSON string representation.
Implementation
def to_json(...)
as_json.to_json(...)
end
def http1?
Signature
-
returns
Boolean Whether this is an HTTP/1 connection.
Implementation
def http1?
false
end
def http2?
Signature
-
returns
Boolean Whether this is an HTTP/2 connection.
Implementation
def http2?
true
end
def start_connection
Start the background reader task if it is not already running.
Implementation
def start_connection
@reader || read_in_background
end
def close(error = nil)
Close the connection and stop the background reader.
Implementation
def close(error = nil)
# Ensure the reader task is stopped.
if @reader
reader = @reader
@reader = nil
reader.stop
end
super
end
def read_in_background(parent: Task.current)
Start a transient background task that reads frames from the connection.
Implementation
def read_in_background(parent: Task.current)
raise RuntimeError, "Connection is closed!" if closed?
parent.async(transient: true) do |task|
@reader = task
task.annotate("#{version} reading data for #{self.class}.")
# We don't need to defer stop here as this is already a transient task (ignores stop):
begin
while !self.closed?
self.consume_window
self.read_frame
end
rescue => error
# Close with error.
ensure
# Don't call #close twice.
if @reader
@reader = nil
self.close(error)
end
end
end
end
def peer
Signature
-
returns
Protocol::HTTP::Peer The peer information for this connection.
Implementation
def peer
@peer ||= ::Protocol::HTTP::Peer.for(@stream.io)
end
def concurrency
Signature
-
returns
Integer The maximum number of concurrent streams allowed.
Implementation
def concurrency
self.maximum_concurrent_streams
end
def viable?
Can we use this connection to make requests?
Implementation
def viable?
@stream&.readable?
end
def reusable?
Signature
-
returns
Boolean Whether the connection can be reused.
Implementation
def reusable?
!self.closed?
end
def version
Signature
-
returns
String The HTTP version string.
Implementation
def version
VERSION
end