class LoadBalancer
Client-side load balancing with health checking. RING_HASH and MAGLEV fall back to round-robin (require request context to hash).
Definitions
ROUND_ROBIN = :round_robin
Load balancing policies.
def initialize(cluster, endpoints)
Initialize load balancer
Signature
-
parameter
clusterResources::Cluster Cluster configuration
-
parameter
endpointsArray<Async::HTTP::Endpoint> Initial endpoints
Implementation
def initialize(cluster, endpoints)
@cluster = cluster
@endpoints = endpoints
@policy = parse_policy(cluster.load_balancer_policy)
@health_status = {} # Track health per endpoint
@health_checker = HealthChecker.new(cluster.health_checks)
@current_index = 0
@in_flight_requests = {} # Track in-flight requests per endpoint
@health_check_task = nil # Transient task for health check loop
# Initialize health status
@endpoints.each do |endpoint|
@health_status[endpoint] = :unknown
end
# Start health checking if configured
start_health_checks if cluster.health_checks.any?
end
def healthy_endpoints
Get healthy endpoints
Signature
-
returns
Array<Async::HTTP::Endpoint> Healthy endpoints
Implementation
def healthy_endpoints
@endpoints.select{|endpoint| healthy?(endpoint)}
end
def pick
Pick next endpoint using load balancing policy
Signature
-
returns
Async::HTTP::Endpoint, nil Selected endpoint
Implementation
def pick
healthy = healthy_endpoints
return nil if healthy.empty?
case @policy
when ROUND_ROBIN
pick_round_robin(healthy)
when LEAST_REQUEST
pick_least_request(healthy)
when RANDOM
pick_random(healthy)
when RING_HASH
pick_ring_hash(healthy)
when MAGLEV
pick_maglev(healthy)
else
healthy.first
end
end
def update_endpoints(endpoints)
Update endpoints from EDS
Signature
-
parameter
endpointsArray<Async::HTTP::Endpoint> New endpoints
Implementation
def update_endpoints(endpoints)
old_endpoints = @endpoints
@endpoints = endpoints
# Update health checker
@health_checker.update_endpoints(endpoints)
# Initialize health status for new endpoints
endpoints.each do |endpoint|
@health_status[endpoint] ||= :unknown
end
# Remove state for old endpoints
(old_endpoints - endpoints).each do |endpoint|
@health_status.delete(endpoint)
@in_flight_requests.delete(endpoint)
end
end
def record_request_start(endpoint)
Record that a request has started for the given endpoint. Used by LEAST_REQUEST policy. Call from Client when a call begins.
Signature
-
parameter
endpointAsync::HTTP::Endpoint The endpoint handling the request
Implementation
def record_request_start(endpoint)
@in_flight_requests[endpoint] ||= 0
@in_flight_requests[endpoint] += 1
end
def record_request_end(endpoint)
Record that a request has finished for the given endpoint. Must be called in ensure to decrement even on error/retry.
Signature
-
parameter
endpointAsync::HTTP::Endpoint The endpoint that handled the request
Implementation
def record_request_end(endpoint)
return unless endpoint
current = @in_flight_requests[endpoint]
return unless current && current > 0
@in_flight_requests[endpoint] = current - 1
@in_flight_requests.delete(endpoint) if @in_flight_requests[endpoint] == 0
end
def mark_unhealthy(endpoint)
Mark endpoint as unhealthy (e.g. after connection failure). Health checker may restore it on next successful check.
Signature
-
parameter
endpointAsync::HTTP::Endpoint The endpoint to mark unhealthy
Implementation
def mark_unhealthy(endpoint)
@health_status[endpoint] = :unhealthy
end
def close
Close load balancer
Implementation
def close
if health_check_task = @health_check_task
@health_check_task = nil
health_check_task.stop
end
@health_checker.close
end