Async::HTTPSourceAsyncHTTPProtocolHTTP2Connection

module Connection

Provides shared connection behaviour for HTTP/2 client and server connections.

Definitions

def initialize(...)

Initialize the connection state.

Implementation

def initialize(...)
	super
	
	@reader = nil
	
	# Writing multiple frames at the same time can cause odd problems if frames are only partially written. So we use a semaphore to ensure frames are written in their entirety.
	@write_frame_guard = Async::Semaphore.new(1)
end

def synchronize(&block)

Synchronize write access to the connection.

Signature

yields {|...| ...}

The block to execute while holding the write lock.

Implementation

def synchronize(&block)
	@write_frame_guard.acquire(&block)
end

def to_s

Signature

returns String

A string representation of this connection.

Implementation

def to_s
	"\#<#{self.class} #{@streams.count} active streams>"
end

def as_json(...)

Signature

returns String

A JSON-compatible representation.

Implementation

def as_json(...)
	to_s
end

def to_json(...)

Signature

returns String

A JSON string representation.

Implementation

def to_json(...)
	as_json.to_json(...)
end

def http1?

Signature

returns Boolean

Whether this is an HTTP/1 connection.

Implementation

def http1?
	false
end

def http2?

Signature

returns Boolean

Whether this is an HTTP/2 connection.

Implementation

def http2?
	true
end

def start_connection

Start the background reader task if it is not already running.

Implementation

def start_connection
	@reader || read_in_background
end

def close(error = nil)

Close the connection and stop the background reader.

Implementation

def close(error = nil)
	# Ensure the reader task is stopped.
	if @reader
		reader = @reader
		@reader = nil
		reader.stop
	end
	
	super
end

def read_in_background(parent: Task.current)

Start a transient background task that reads frames from the connection.

Implementation

def read_in_background(parent: Task.current)
	raise RuntimeError, "Connection is closed!" if closed?
	
	parent.async(transient: true) do |task|
		@reader = task
		
		task.annotate("#{version} reading data for #{self.class}.")
		
		# We don't need to defer stop here as this is already a transient task (ignores stop):
		begin
			while !self.closed?
				self.consume_window
				self.read_frame
			end
		rescue => error
			# Close with error.
		ensure
			# Don't call #close twice.
			if @reader
				@reader = nil
				
				self.close(error)
			end
		end
	end
end

def peer

Signature

returns Protocol::HTTP::Peer

The peer information for this connection.

Implementation

def peer
	@peer ||= ::Protocol::HTTP::Peer.for(@stream.io)
end

def concurrency

Signature

returns Integer

The maximum number of concurrent streams allowed.

Implementation

def concurrency
	self.maximum_concurrent_streams
end

def viable?

Can we use this connection to make requests?

Implementation

def viable?
	@stream&.readable?
end

def reusable?

Signature

returns Boolean

Whether the connection can be reused.

Implementation

def reusable?
	!self.closed?
end

def version

Signature

returns String

The HTTP version string.

Implementation

def version
	VERSION
end