class Client
Wrapper client for xDS-enabled gRPC connections Follows the same pattern as Async::Redis::SentinelClient and ClusterClient
Nested
Definitions
ConfigurationError = Context::ConfigurationError
Raised when xDS configuration cannot be loaded
ReloadError = Context::ReloadError
Raised when cluster configuration cannot be reloaded
def initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options)
Create a new xDS client
Signature
-
parameter
service_nameString Target service name (e.g., "myservice")
-
parameter
bootstrapHash, String, nil Bootstrap config (hash, file path, or nil for default)
-
parameter
headersProtocol::HTTP::Headers Default headers
-
parameter
optionsHash Additional options passed to underlying clients
Implementation
def initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options)
@service_name = service_name
@bootstrap = load_bootstrap(bootstrap)
@headers = headers
@options = options
@context = Context.new(@bootstrap, node: node || @bootstrap[:node])
@load_balancer = nil
@clients = {} # Cache clients per endpoint (like ClusterClient caches node.client)
@mutex = Mutex.new
end
def resolve_endpoints
Resolve endpoints lazily (like SentinelClient.resolve_address)
Signature
-
returns
Array<Async::HTTP::Endpoint> Available endpoints
Implementation
def resolve_endpoints
@mutex.synchronize do
unless @load_balancer
# Discover cluster via CDS
cluster = @context.discover_cluster(@service_name)
# Discover endpoints via EDS
endpoints = @context.discover_endpoints(cluster)
raise NoEndpointsError, "No endpoints discovered for #{@service_name}" if endpoints.empty?
# Create load balancer
@load_balancer = LoadBalancer.new(cluster, endpoints)
# Set load balancer reference in context for endpoint updates
@context.load_balancer = @load_balancer
end
@load_balancer.healthy_endpoints
end
rescue Context::ReloadError => error
raise NoEndpointsError, "No endpoints discovered for #{@service_name}", cause: error
end
def client_for_call
Get a client for making calls (like ClusterClient.client_for) Resolves endpoints lazily and picks one via load balancer
Signature
-
returns
Array(Async::GRPC::Client, Async::HTTP::Endpoint) Client and endpoint for request tracking
Implementation
def client_for_call
endpoints = resolve_endpoints
raise NoEndpointsError, "No endpoints available for #{@service_name}" if endpoints.empty?
# Pick endpoint via load balancer
endpoint = @load_balancer.pick
raise NoEndpointsError, "No healthy endpoints available" unless endpoint
# Cache client per endpoint (like ClusterClient caches node.client)
client = @clients[endpoint] ||= begin
http_client = Async::HTTP::Client.new(endpoint, **@options)
Async::GRPC::Client.new(http_client, headers: @headers)
end
[client, endpoint]
end
def call(request, attempts: 3)
Implement Protocol::HTTP::Middleware interface This allows XDS::Client to be used anywhere Async::GRPC::Client is used
Signature
-
parameter
requestProtocol::HTTP::Request The HTTP request
-
returns
Protocol::HTTP::Response The HTTP response
Implementation
def call(request, attempts: 3)
client, endpoint = client_for_call
@load_balancer.record_request_start(endpoint)
begin
client.call(request)
rescue Protocol::GRPC::Error => error
# Handle endpoint changes (like ClusterClient handles MOVED/ASK)
if error.status_code == Protocol::GRPC::Status::UNAVAILABLE
Console.warn(self, error)
# Invalidate cache, reload configuration
invalidate_cache!
attempts -= 1
retry if attempts > 0
end
raise
rescue => error
# Network errors might indicate endpoint failure
Console.warn(self, error)
# Invalidate this specific endpoint
invalidate_endpoint(client)
attempts -= 1
retry if attempts > 0
raise
end
ensure
@load_balancer&.record_request_end(endpoint)
end
def stub(interface_class, service_name)
Create a stub for the given interface. Same API as Async::GRPC::Client - load balancing happens per RPC call.
Signature
-
parameter
interface_classClass Interface class (subclass of Protocol::GRPC::Interface)
-
parameter
service_nameString Service name (e.g., "hello.Greeter")
-
returns
Async::GRPC::Stub Stub object with methods for each RPC
Implementation
def stub(interface_class, service_name)
interface = interface_class.new(service_name)
Stub.new(self, interface)
end
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block)
Invoke an RPC (called by Stub). Load balances per call.
Signature
-
parameter
serviceProtocol::GRPC::Interface Interface instance
-
parameter
methodSymbol, String Method name
-
parameter
requestObject | Nil Request message
-
parameter
metadataHash Custom metadata headers
-
parameter
timeoutNumeric | Nil Optional timeout in seconds
-
parameter
encodingString | Nil Optional compression encoding
-
parameter
initialObject | Array Optional initial message(s) for bidirectional streaming
-
yields
{|input, output| ...} Block for streaming calls
-
returns
Object | Protocol::GRPC::Body::ReadableBody Response message or readable body
Implementation
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block)
client, endpoint = client_for_call
@load_balancer.record_request_start(endpoint)
begin
client.invoke(service, method, request, metadata: metadata, timeout: timeout, encoding: encoding, initial: initial, &block)
rescue Protocol::GRPC::Error => error
if error.status_code == Protocol::GRPC::Status::UNAVAILABLE
Console.warn(self, error)
invalidate_cache!
attempts -= 1
retry if attempts > 0
end
raise
rescue => error
Console.warn(self, error)
invalidate_endpoint(client)
attempts -= 1
retry if attempts > 0
raise
end
ensure
@load_balancer&.record_request_end(endpoint)
end
def close
Close xDS client and all connections
Implementation
def close
@clients.each_value(&:close)
@clients.clear
@context.close
@load_balancer&.close
end