class ClusterClient
A Redis cluster client that manages multiple Redis instances and handles cluster operations.
Nested
Definitions
def initialize(endpoints, **options)
Create a new instance of the cluster client.
Implementation
def initialize(endpoints, **options)
@endpoints = endpoints
@options = options
@shards = nil
end
def clients_for(*keys, role: :master, attempts: 3)
Execute a block with clients for the given keys, grouped by cluster slot.
Signature
-
parameter
keys
Array
The keys to find clients for.
-
parameter
role
Symbol
The role of nodes to use (:master or :slave).
-
parameter
attempts
Integer
Number of retry attempts for cluster errors.
-
yields
{|client, keys| ...}
Block called for each client-keys pair.
-
parameter
client
Client
The Redis client for the slot.
-
parameter
keys
Array
The keys handled by this client.
-
parameter
Implementation
def clients_for(*keys, role: :master, attempts: 3)
slots = slots_for(keys)
slots.each do |slot, keys|
yield client_for(slot, role), keys
end
rescue ServerError => error
Console.warn(self, error)
if error.message =~ /MOVED|ASK/
reload_cluster!
attempts -= 1
retry if attempts > 0
raise
else
raise
end
end
def client_for(slot, role = :master)
Get a client for a specific slot.
Signature
-
parameter
slot
Integer
The cluster slot number.
-
parameter
role
Symbol
The role of node to get (:master or :slave).
-
returns
Client
The Redis client for the slot.
Implementation
def client_for(slot, role = :master)
unless @shards
reload_cluster!
end
if nodes = @shards.find(slot)
nodes = nodes.select{|node| node.role == role}
else
raise SlotError, "No nodes found for slot #{slot}"
end
if node = nodes.sample
return (node.client ||= Client.new(node.endpoint, **@options))
end
end
def any_client(role = :master)
Get any available client from the cluster. This is useful for operations that don't require slot-specific routing, such as global pub/sub operations, INFO commands, or other cluster-wide operations.
Signature
-
parameter
role
Symbol
The role of node to get (:master or :slave).
-
returns
Client
A Redis client for any available node.
Implementation
def any_client(role = :master)
unless @shards
reload_cluster!
end
# Sample a random shard to get better load distribution
if nodes = @shards.sample
nodes = nodes.select{|node| node.role == role}
if node = nodes.sample
return (node.client ||= Client.new(node.endpoint, **@options))
end
end
# Fallback to slot 0 if sampling fails
client_for(0, role)
end
def crc16(bytes)
This is the CRC16 algorithm used by Redis Cluster to hash keys. Copied from https://github.com/antirez/redis-rb-cluster/blob/master/crc16.rb
Implementation
def crc16(bytes)
sum = 0
bytes.each_byte do |byte|
sum = ((sum << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((sum >> 8) ^ byte) & 0xff]
end
return sum
end
def slot_for(key)
Return Redis::Client for a given key. Modified from https://github.com/antirez/redis-rb-cluster/blob/master/cluster.rb#L104-L117
Implementation
def slot_for(key)
key = key.to_s
if s = key.index("{")
if e = key.index("}", s + 1) and e != s + 1
key = key[s + 1..e - 1]
end
end
return crc16(key) % HASH_SLOTS
end
def slots_for(keys)
Calculate the hash slots for multiple keys.
Signature
-
parameter
keys
Array
The keys to calculate slots for.
-
returns
Hash
A hash mapping slot numbers to arrays of keys.
Implementation
def slots_for(keys)
slots = Hash.new{|hash, key| hash[key] = []}
keys.each do |key|
slots[slot_for(key)] << key
end
return slots
end
def subscribe(*channels)
Subscribe to one or more sharded channels for pub/sub messaging in cluster environment. The subscription will be created on the appropriate nodes responsible for each channel's hash slot.
Signature
-
parameter
channels
Array(String)
The sharded channels to subscribe to.
-
yields
{|context| ...}
If a block is given, it will be executed within the subscription context.
-
parameter
context
ClusterSubscription
The cluster subscription context.
-
parameter
-
returns
Object
The result of the block if block given.
-
returns
ClusterSubscription
The cluster subscription context if no block given.
Implementation
def subscribe(*channels)
context = ClusterSubscription.new(self)
if channels.any?
context.subscribe(channels)
end
if block_given?
begin
yield context
ensure
context.close
end
else
return context
end
end