Async::GRPC::XDSSourceAsyncGRPCXDSLoadBalancer

class LoadBalancer

Client-side load balancing with health checking. RING_HASH and MAGLEV fall back to round-robin (require request context to hash).

Definitions

ROUND_ROBIN = :round_robin

Load balancing policies.

def initialize(cluster, endpoints)

Initialize load balancer

Signature

parameter cluster Resources::Cluster

Cluster configuration

parameter endpoints Array<Async::HTTP::Endpoint>

Initial endpoints

Implementation

def initialize(cluster, endpoints)
	@cluster = cluster
	@endpoints = endpoints
	@policy = parse_policy(cluster.load_balancer_policy)
	@health_status = {}  # Track health per endpoint
	@health_checker = HealthChecker.new(cluster.health_checks)
	@current_index = 0
	@in_flight_requests = {}  # Track in-flight requests per endpoint
	@health_check_task = nil  # Transient task for health check loop
	
	# Initialize health status
	@endpoints.each do |endpoint|
		@health_status[endpoint] = :unknown
	end
	
	# Start health checking if configured
	start_health_checks if cluster.health_checks.any?
end

def healthy_endpoints

Get healthy endpoints

Signature

returns Array<Async::HTTP::Endpoint>

Healthy endpoints

Implementation

def healthy_endpoints
	@endpoints.select{|endpoint| healthy?(endpoint)}
end

def pick

Pick next endpoint using load balancing policy

Signature

returns Async::HTTP::Endpoint, nil

Selected endpoint

Implementation

def pick
	healthy = healthy_endpoints
	return nil if healthy.empty?
	
	case @policy
	when ROUND_ROBIN
		pick_round_robin(healthy)
	when LEAST_REQUEST
		pick_least_request(healthy)
	when RANDOM
		pick_random(healthy)
	when RING_HASH
		pick_ring_hash(healthy)
	when MAGLEV
		pick_maglev(healthy)
	else
		healthy.first
	end
end

def update_endpoints(endpoints)

Update endpoints from EDS

Signature

parameter endpoints Array<Async::HTTP::Endpoint>

New endpoints

Implementation

def update_endpoints(endpoints)
	old_endpoints = @endpoints
	@endpoints = endpoints
	
	# Update health checker
	@health_checker.update_endpoints(endpoints)
	
	# Initialize health status for new endpoints
	endpoints.each do |endpoint|
		@health_status[endpoint] ||= :unknown
	end
	
	# Remove state for old endpoints
	(old_endpoints - endpoints).each do |endpoint|
		@health_status.delete(endpoint)
		@in_flight_requests.delete(endpoint)
	end
end

def record_request_start(endpoint)

Record that a request has started for the given endpoint. Used by LEAST_REQUEST policy. Call from Client when a call begins.

Signature

parameter endpoint Async::HTTP::Endpoint

The endpoint handling the request

Implementation

def record_request_start(endpoint)
	@in_flight_requests[endpoint] ||= 0
	@in_flight_requests[endpoint] += 1
end

def record_request_end(endpoint)

Record that a request has finished for the given endpoint. Must be called in ensure to decrement even on error/retry.

Signature

parameter endpoint Async::HTTP::Endpoint

The endpoint that handled the request

Implementation

def record_request_end(endpoint)
	return unless endpoint
	current = @in_flight_requests[endpoint]
	return unless current && current > 0
	@in_flight_requests[endpoint] = current - 1
	@in_flight_requests.delete(endpoint) if @in_flight_requests[endpoint] == 0
end

def mark_unhealthy(endpoint)

Mark endpoint as unhealthy (e.g. after connection failure). Health checker may restore it on next successful check.

Signature

parameter endpoint Async::HTTP::Endpoint

The endpoint to mark unhealthy

Implementation

def mark_unhealthy(endpoint)
	@health_status[endpoint] = :unhealthy
end

def close

Close load balancer

Implementation

def close
	if health_check_task = @health_check_task
		@health_check_task = nil
		health_check_task.stop
	end
	
	@health_checker.close
end