class ADSStream
Encapsulates a single ADS (Aggregated Discovery Service) bidirectional stream. Owns the stream lifecycle and delegates events to a delegate object.
Nested
Definitions
def send(request)
Send a DiscoveryRequest on the stream. Call from within discovery_response to send ACKs.
Signature
-
parameter
requestEnvoy::Service::Discovery::V3::DiscoveryRequest The request to send
Implementation
def send(request)
@body&.write(request)
end
def run(initial: nil)
Run the ADS stream. Blocks until the stream completes or errors.
Signature
-
parameter
initialObject | Array | Nil Initial message(s) to send (defaults to node-only request if nil/empty)
Implementation
def run(initial: nil)
service = Envoy::Service::Discovery::V3::AggregatedDiscoveryService.new(
"envoy.service.discovery.v3.AggregatedDiscoveryService"
)
initial = Array(initial).any? ? initial : [Envoy::Service::Discovery::V3::DiscoveryRequest.new(node: @node)]
@client.invoke(service, :StreamAggregatedResources, nil, initial: initial) do |body, readable_body|
@body = body
@delegate.stream_opened(self) if @delegate.respond_to?(:stream_opened)
begin
readable_body.each do |response|
@delegate.discovery_response(response, self)
end
ensure
@delegate.stream_closed(self) if @delegate.respond_to?(:stream_closed)
@body = nil
end
end
rescue => error
@delegate.stream_error(self, error) if @delegate.respond_to?(:stream_error)
Console.error(self, "Failed while streaming updates!", exception: error)
raise
end