Async::GRPC::XDSSourceAsyncGRPCXDSADSStream

class ADSStream

Encapsulates a single ADS (Aggregated Discovery Service) bidirectional stream. Owns the stream lifecycle and delegates events to a delegate object.

Nested

Definitions

def send(request)

Send a DiscoveryRequest on the stream. Call from within discovery_response to send ACKs.

Signature

parameter request Envoy::Service::Discovery::V3::DiscoveryRequest

The request to send

Implementation

def send(request)
	@body&.write(request)
end

def run(initial: nil)

Run the ADS stream. Blocks until the stream completes or errors.

Signature

parameter initial Object | Array | Nil

Initial message(s) to send (defaults to node-only request if nil/empty)

Implementation

def run(initial: nil)
	service = Envoy::Service::Discovery::V3::AggregatedDiscoveryService.new(
		"envoy.service.discovery.v3.AggregatedDiscoveryService"
	)
	
	initial = Array(initial).any? ? initial : [Envoy::Service::Discovery::V3::DiscoveryRequest.new(node: @node)]
	
	@client.invoke(service, :StreamAggregatedResources, nil, initial: initial) do |body, readable_body|
		@body = body
		@delegate.stream_opened(self) if @delegate.respond_to?(:stream_opened)
		
		begin
			readable_body.each do |response|
				@delegate.discovery_response(response, self)
			end
		ensure
			@delegate.stream_closed(self) if @delegate.respond_to?(:stream_closed)
			@body = nil
		end
	end
rescue => error
	@delegate.stream_error(self, error) if @delegate.respond_to?(:stream_error)
	Console.error(self, "Failed while streaming updates!", exception: error)
	raise
end