Async::GRPC::XDSSourceAsyncGRPCXDSDiscoveryClient

class DiscoveryClient

Client for xDS APIs (ADS or individual APIs) Implements Aggregated Discovery Service (ADS) protocol Acts as delegate for ADSStream, receiving discovery_response events

Definitions

LISTENER_TYPE = "type.googleapis.com/envoy.config.listener.v3.Listener"

xDS API type URLs (v3 API)

def initialize(server_config, node: nil)

Initialize xDS discovery client

Signature

parameter server_config Hash

xDS server configuration from bootstrap

parameter node Hash

Node information (id, cluster, metadata, locality)

Implementation

def initialize(server_config, node: nil)
	@server_uri = server_config[:server_uri]
	@channel_creds = server_config[:channel_creds]
	@server_features = server_config[:server_features] || []
	@node_info = node || build_node_info
	@node = build_node_proto(@node_info)
	@grpc_client = nil
	@versions = {}  # Track version_info per type_url
	@nonces = {}     # Track nonces per type_url
	@mutex = Mutex.new
	@subscriptions = {}  # Track subscriptions by type_url
	@stream_task = nil
	@ads_stream = nil  # ADSStream instance when connected (owns stream state)
	@stream_ready_promise = nil  # Resolved when stream_opened runs
end

def subscribe(type_url, resource_names, &block)

Subscribe to resource type using ADS (Aggregated Discovery Service - single stream for all types)

Signature

parameter type_url String

Resource type URL

parameter resource_names Array<String>

Resources to subscribe to

returns Async::Task

Subscription task

Implementation

def subscribe(type_url, resource_names, &block)
	# Store subscription callback
	@mutex.synchronize do
		@subscriptions[type_url] = {
			resource_names: resource_names,
			callback: block
		}
	end
	
	# Ensure ADS stream is running
	ensure_stream_running
	
	# Wait for stream to be ready (event-driven, no polling)
	promise = @stream_ready_promise
	if promise && !promise.completed?
		begin
			promise.wait(timeout: 5)
		rescue Async::TimeoutError
			# Stream didn't open in time; send_discovery_request will no-op if @ads_stream is nil
		end
	end
	
	send_discovery_request(type_url, resource_names) if @ads_stream
	
	# Return the stream task (already running)
	@stream_task
end

def close

Close xDS discovery client

Implementation

def close
	@mutex.synchronize do
		@stream_task&.stop
		@grpc_client&.close
		@grpc_client = nil
		@subscriptions.clear
		@stream_task = nil
		@ads_stream = nil
		@stream_ready_promise = nil
	end
end