Async::GRPC::XDSSourceAsyncGRPCXDSContext

class Context

Manages xDS subscriptions and maintains discovered resource state

Nested

Definitions

def initialize(bootstrap, node: nil)

Initialize xDS context

Signature

parameter bootstrap Hash

Bootstrap configuration

parameter node Hash

Node information (id, cluster, metadata, locality)

Implementation

def initialize(bootstrap, node: nil)
	@bootstrap = bootstrap
	xds_server = bootstrap[:xds_servers]&.first
	raise ConfigurationError, "No xds_servers in bootstrap" unless xds_server
	
	@discovery_client = DiscoveryClient.new(xds_server, node: node)
	@cache = ResourceCache.new
	@subscriptions = {}  # Track active subscriptions
	@load_balancer = nil  # Will be set by Client
	@mutex = Mutex.new
	@cluster_promises = {}  # service_name -> Async::Promise (level-triggered: resolved value persists)
	@endpoint_promises = {}  # cluster_name -> Async::Promise
end

def load_balancer=(load_balancer)

Set load balancer reference (called by Client)

Signature

parameter load_balancer LoadBalancer

Load balancer instance

Implementation

def load_balancer=(load_balancer)
	@load_balancer = load_balancer
end

def discover_cluster(service_name)

Discover cluster for service (like ClusterClient.reload_cluster!)

Signature

parameter service_name String

Service to discover

returns Resources::Cluster

Cluster configuration

Implementation

def discover_cluster(service_name)
	@mutex.synchronize do
		# Check cache first
		if cluster = @cache.get_cluster(service_name)
			return cluster
		end
		
		# Subscribe to CDS if not already subscribed
		unless @subscriptions[:cds]
			@subscriptions[:cds] = subscribe_cds(service_name)
		end
		
		# Subscribe to EDS for same name up front (EDS clusters use service name as cluster name)
		# This avoids 10s delay between CDS and EDS - both requests go out together
		subscription_key = :"eds_#{service_name}"
		unless @subscriptions[subscription_key]
			@subscriptions[subscription_key] = subscribe_eds(service_name)
		end
	end
	return @cache.get_cluster(service_name) if @cache.get_cluster(service_name)
	
	# Wait for cluster (CDS response)
	cluster = wait_for_cluster(service_name, timeout: 10)
	raise ReloadError, "Failed to discover cluster: #{service_name}" unless cluster
	cluster
end

def discover_endpoints(cluster)

Discover endpoints for cluster (like ClusterClient discovers nodes)

Signature

parameter cluster Resources::Cluster

Cluster configuration

returns Array<Async::HTTP::Endpoint>

Discovered endpoints

Implementation

def discover_endpoints(cluster)
	cluster_name = cluster.name
	@mutex.synchronize do
		# Check cache first
		if endpoints = @cache.get_endpoints(cluster_name)
			return endpoints
		end
		
		# Subscribe to EDS if not already subscribed
		subscription_key = :"eds_#{cluster_name}"
		unless @subscriptions[subscription_key]
			@subscriptions[subscription_key] = subscribe_eds(cluster_name)
		end
	end
	return @cache.get_endpoints(cluster_name) if @cache.get_endpoints(cluster_name)
	
	# Wait outside mutex so EDS callback can run and update cache
	endpoints = wait_for_endpoints(cluster_name, timeout: 10)
	raise ReloadError, "Failed to discover endpoints for cluster: #{cluster_name}" unless endpoints
	endpoints
end

def subscribe_cds(service_name)

Subscribe to CDS (Cluster Discovery Service)

Signature

parameter service_name String

Service name

returns Async::Task

Subscription task

Implementation

def subscribe_cds(service_name)
	@discovery_client.subscribe(
		DiscoveryClient::CLUSTER_TYPE,
		[service_name]
	) do |resources|
		resources.each do |resource|
			cluster = resource.is_a?(Resources::Cluster) ? resource : Resources::Cluster.from_proto(resource)
			@cache.update_cluster(cluster)
			resolve_cluster_promise(cluster.name, cluster)
		end
	end
end

def subscribe_eds(cluster_name)

Subscribe to EDS (Endpoint Discovery Service)

Signature

parameter cluster_name String

Cluster name

returns Async::Task

Subscription task

Implementation

def subscribe_eds(cluster_name)
	@discovery_client.subscribe(
		DiscoveryClient::ENDPOINT_TYPE,
		[cluster_name]
	) do |resources|
		resources.each do |resource|
			assignment = resource.is_a?(Resources::ClusterLoadAssignment) ? resource : Resources::ClusterLoadAssignment.from_proto(resource)
			endpoints = assignment.endpoints.select(&:healthy?).map do |endpoint|
				Async::HTTP::Endpoint.parse(endpoint.uri, protocol: Async::HTTP::Protocol::HTTP2)
			end
			@cache.update_endpoints(cluster_name, endpoints)
			resolve_endpoint_promise(cluster_name, endpoints) unless endpoints.empty?
			@load_balancer&.update_endpoints(endpoints)
		end
	end
end

def close

Close all subscriptions

Implementation

def close
	@mutex.synchronize do
		@subscriptions.each_value do |task|
			task.stop if task.respond_to?(:stop)
		end
		@subscriptions.clear
		@cluster_promises.clear
		@endpoint_promises.clear
	end
	@discovery_client.close
end