Async::RedisSourceAsyncRedisClusterClient

class ClusterClient

A Redis cluster client that manages multiple Redis instances and handles cluster operations.

Nested

Definitions

def initialize(endpoints, **options)

Create a new instance of the cluster client.

Implementation

def initialize(endpoints, **options)
	@endpoints = endpoints
	@options = options
	@shards = nil
end

def clients_for(*keys, role: :master, attempts: 3)

Execute a block with clients for the given keys, grouped by cluster slot.

Signature

parameter keys Array

The keys to find clients for.

parameter role Symbol

The role of nodes to use (:master or :slave).

parameter attempts Integer

Number of retry attempts for cluster errors.

yields {|client, keys| ...}

Block called for each client-keys pair.

parameter client Client

The Redis client for the slot.

parameter keys Array

The keys handled by this client.

Implementation

def clients_for(*keys, role: :master, attempts: 3)
	slots = slots_for(keys)
	
	slots.each do |slot, keys|
		yield client_for(slot, role), keys
	end
rescue ServerError => error
	Console.warn(self, error)
	
	if error.message =~ /MOVED|ASK/
		reload_cluster!
		
		attempts -= 1
		
		retry if attempts > 0
		
		raise
	else
		raise
	end
end

def client_for(slot, role = :master)

Get a client for a specific slot.

Signature

parameter slot Integer

The cluster slot number.

parameter role Symbol

The role of node to get (:master or :slave).

returns Client

The Redis client for the slot.

Implementation

def client_for(slot, role = :master)
	unless @shards
		reload_cluster!
	end
	
	if nodes = @shards.find(slot)
		nodes = nodes.select{|node| node.role == role}
	else
		raise SlotError, "No nodes found for slot #{slot}"
	end
	
	if node = nodes.sample
		return (node.client ||= Client.new(node.endpoint, **@options))
	end
end

def any_client(role = :master)

Get any available client from the cluster. This is useful for operations that don't require slot-specific routing, such as global pub/sub operations, INFO commands, or other cluster-wide operations.

Signature

parameter role Symbol

The role of node to get (:master or :slave).

returns Client

A Redis client for any available node.

Implementation

def any_client(role = :master)
	unless @shards
		reload_cluster!
	end
	
	# Sample a random shard to get better load distribution
	if nodes = @shards.sample
		nodes = nodes.select{|node| node.role == role}
		
		if node = nodes.sample
			return (node.client ||= Client.new(node.endpoint, **@options))
		end
	end
	
	# Fallback to slot 0 if sampling fails
	client_for(0, role)
end

def crc16(bytes)

This is the CRC16 algorithm used by Redis Cluster to hash keys. Copied from https://github.com/antirez/redis-rb-cluster/blob/master/crc16.rb

Implementation

def crc16(bytes)
	sum = 0
	
	bytes.each_byte do |byte|
		sum = ((sum << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((sum >> 8) ^ byte) & 0xff]
	end
	
	return sum
end

def slot_for(key)

Return Redis::Client for a given key. Modified from https://github.com/antirez/redis-rb-cluster/blob/master/cluster.rb#L104-L117

Implementation

def slot_for(key)
	key = key.to_s
	
	if s = key.index("{")
		if e = key.index("}", s + 1) and e != s + 1
			key = key[s + 1..e - 1]
		end
	end
	
	return crc16(key) % HASH_SLOTS
end

def slots_for(keys)

Calculate the hash slots for multiple keys.

Signature

parameter keys Array

The keys to calculate slots for.

returns Hash

A hash mapping slot numbers to arrays of keys.

Implementation

def slots_for(keys)
	slots = Hash.new{|hash, key| hash[key] = []}
	
	keys.each do |key|
		slots[slot_for(key)] << key
	end
	
	return slots
end

def subscribe(*channels)

Subscribe to one or more sharded channels for pub/sub messaging in cluster environment. The subscription will be created on the appropriate nodes responsible for each channel's hash slot.

Signature

parameter channels Array(String)

The sharded channels to subscribe to.

yields {|context| ...}

If a block is given, it will be executed within the subscription context.

parameter context ClusterSubscription

The cluster subscription context.

returns Object

The result of the block if block given.

returns ClusterSubscription

The cluster subscription context if no block given.

Implementation

def subscribe(*channels)
	context = ClusterSubscription.new(self)
	
	if channels.any?
		context.subscribe(channels)
	end
	
	if block_given?
		begin
			yield context
		ensure
			context.close
		end
	else
		return context
	end
end