class Context
Manages xDS subscriptions and maintains discovered resource state
Nested
Definitions
def initialize(bootstrap, node: nil)
Initialize xDS context
Signature
-
parameter
bootstrapHash Bootstrap configuration
-
parameter
nodeHash Node information (id, cluster, metadata, locality)
Implementation
def initialize(bootstrap, node: nil)
@bootstrap = bootstrap
xds_server = bootstrap[:xds_servers]&.first
raise ConfigurationError, "No xds_servers in bootstrap" unless xds_server
@discovery_client = DiscoveryClient.new(xds_server, node: node)
@cache = ResourceCache.new
@subscriptions = {} # Track active subscriptions
@load_balancer = nil # Will be set by Client
@mutex = Mutex.new
@cluster_promises = {} # service_name -> Async::Promise (level-triggered: resolved value persists)
@endpoint_promises = {} # cluster_name -> Async::Promise
end
def load_balancer=(load_balancer)
Set load balancer reference (called by Client)
Signature
-
parameter
load_balancerLoadBalancer Load balancer instance
Implementation
def load_balancer=(load_balancer)
@load_balancer = load_balancer
end
def discover_cluster(service_name)
Discover cluster for service (like ClusterClient.reload_cluster!)
Signature
-
parameter
service_nameString Service to discover
-
returns
Resources::Cluster Cluster configuration
Implementation
def discover_cluster(service_name)
@mutex.synchronize do
# Check cache first
if cluster = @cache.get_cluster(service_name)
return cluster
end
# Subscribe to CDS if not already subscribed
unless @subscriptions[:cds]
@subscriptions[:cds] = subscribe_cds(service_name)
end
# Subscribe to EDS for same name up front (EDS clusters use service name as cluster name)
# This avoids 10s delay between CDS and EDS - both requests go out together
subscription_key = :"eds_#{service_name}"
unless @subscriptions[subscription_key]
@subscriptions[subscription_key] = subscribe_eds(service_name)
end
end
return @cache.get_cluster(service_name) if @cache.get_cluster(service_name)
# Wait for cluster (CDS response)
cluster = wait_for_cluster(service_name, timeout: 10)
raise ReloadError, "Failed to discover cluster: #{service_name}" unless cluster
cluster
end
def discover_endpoints(cluster)
Discover endpoints for cluster (like ClusterClient discovers nodes)
Signature
-
parameter
clusterResources::Cluster Cluster configuration
-
returns
Array<Async::HTTP::Endpoint> Discovered endpoints
Implementation
def discover_endpoints(cluster)
cluster_name = cluster.name
@mutex.synchronize do
# Check cache first
if endpoints = @cache.get_endpoints(cluster_name)
return endpoints
end
# Subscribe to EDS if not already subscribed
subscription_key = :"eds_#{cluster_name}"
unless @subscriptions[subscription_key]
@subscriptions[subscription_key] = subscribe_eds(cluster_name)
end
end
return @cache.get_endpoints(cluster_name) if @cache.get_endpoints(cluster_name)
# Wait outside mutex so EDS callback can run and update cache
endpoints = wait_for_endpoints(cluster_name, timeout: 10)
raise ReloadError, "Failed to discover endpoints for cluster: #{cluster_name}" unless endpoints
endpoints
end
def subscribe_cds(service_name)
Subscribe to CDS (Cluster Discovery Service)
Signature
-
parameter
service_nameString Service name
-
returns
Async::Task Subscription task
Implementation
def subscribe_cds(service_name)
@discovery_client.subscribe(
DiscoveryClient::CLUSTER_TYPE,
[service_name]
) do |resources|
resources.each do |resource|
cluster = resource.is_a?(Resources::Cluster) ? resource : Resources::Cluster.from_proto(resource)
@cache.update_cluster(cluster)
resolve_cluster_promise(cluster.name, cluster)
end
end
end
def subscribe_eds(cluster_name)
Subscribe to EDS (Endpoint Discovery Service)
Signature
-
parameter
cluster_nameString Cluster name
-
returns
Async::Task Subscription task
Implementation
def subscribe_eds(cluster_name)
@discovery_client.subscribe(
DiscoveryClient::ENDPOINT_TYPE,
[cluster_name]
) do |resources|
resources.each do |resource|
assignment = resource.is_a?(Resources::ClusterLoadAssignment) ? resource : Resources::ClusterLoadAssignment.from_proto(resource)
endpoints = assignment.endpoints.select(&:healthy?).map do |endpoint|
Async::HTTP::Endpoint.parse(endpoint.uri, protocol: Async::HTTP::Protocol::HTTP2)
end
@cache.update_endpoints(cluster_name, endpoints)
resolve_endpoint_promise(cluster_name, endpoints) unless endpoints.empty?
@load_balancer&.update_endpoints(endpoints)
end
end
end
def close
Close all subscriptions
Implementation
def close
@mutex.synchronize do
@subscriptions.each_value do |task|
task.stop if task.respond_to?(:stop)
end
@subscriptions.clear
@cluster_promises.clear
@endpoint_promises.clear
end
@discovery_client.close
end