class DiscoveryClient
Client for xDS APIs (ADS or individual APIs) Implements Aggregated Discovery Service (ADS) protocol Acts as delegate for ADSStream, receiving discovery_response events
Definitions
LISTENER_TYPE = "type.googleapis.com/envoy.config.listener.v3.Listener"
xDS API type URLs (v3 API)
def initialize(server_config, node: nil)
Initialize xDS discovery client
Signature
-
parameter
server_configHash xDS server configuration from bootstrap
-
parameter
nodeHash Node information (id, cluster, metadata, locality)
Implementation
def initialize(server_config, node: nil)
@server_uri = server_config[:server_uri]
@channel_creds = server_config[:channel_creds]
@server_features = server_config[:server_features] || []
@node_info = node || build_node_info
@node = build_node_proto(@node_info)
@grpc_client = nil
@versions = {} # Track version_info per type_url
@nonces = {} # Track nonces per type_url
@mutex = Mutex.new
@subscriptions = {} # Track subscriptions by type_url
@stream_task = nil
@ads_stream = nil # ADSStream instance when connected (owns stream state)
@stream_ready_promise = nil # Resolved when stream_opened runs
end
def subscribe(type_url, resource_names, &block)
Subscribe to resource type using ADS (Aggregated Discovery Service - single stream for all types)
Signature
-
parameter
type_urlString Resource type URL
-
parameter
resource_namesArray<String> Resources to subscribe to
-
returns
Async::Task Subscription task
Implementation
def subscribe(type_url, resource_names, &block)
# Store subscription callback
@mutex.synchronize do
@subscriptions[type_url] = {
resource_names: resource_names,
callback: block
}
end
# Ensure ADS stream is running
ensure_stream_running
# Wait for stream to be ready (event-driven, no polling)
promise = @stream_ready_promise
if promise && !promise.completed?
begin
promise.wait(timeout: 5)
rescue Async::TimeoutError
# Stream didn't open in time; send_discovery_request will no-op if @ads_stream is nil
end
end
send_discovery_request(type_url, resource_names) if @ads_stream
# Return the stream task (already running)
@stream_task
end
def close
Close xDS discovery client
Implementation
def close
@mutex.synchronize do
@stream_task&.stop
@grpc_client&.close
@grpc_client = nil
@subscriptions.clear
@stream_task = nil
@ads_stream = nil
@stream_ready_promise = nil
end
end