Async::RedisSourceAsyncRedisClusterSubscription

class ClusterSubscription

Context for managing sharded subscriptions across multiple Redis cluster nodes. This class handles the complexity of subscribing to channels that may be distributed across different shards in a Redis cluster.

Nested

Definitions

def initialize(cluster_client, queue: Async::LimitedQueue.new)

Initialize a new shard subscription context.

Signature

parameter cluster_client ClusterClient

The cluster client to use.

Implementation

def initialize(cluster_client, queue: Async::LimitedQueue.new)
	@cluster_client = cluster_client
	@subscriptions = {}
	@channels = []
	
	@barrier = Async::Barrier.new
	@queue = queue
end

def close

Close all shard subscriptions.

Implementation

def close
	if barrier = @barrier
		@barrier = nil
		barrier.stop
	end
	
	@subscriptions.each_value(&:close)
	@subscriptions.clear
end

def listen

Listen for the next message from any subscribed shard.

Signature

returns Array

The next message response.

raises SubscriptionError

If the subscription has failed for any reason.

Implementation

def listen
	@queue.pop
rescue => error
	raise SubscriptionError, "Failed to read message!"
end

def each

Iterate over all messages from all subscribed shards.

Signature

yields {|response| ...}

Block called for each message.

parameter response Array

The message response.

Implementation

def each
	return to_enum unless block_given?
	
	while response = self.listen
		yield response
	end
end

def subscribe(channels)

Subscribe to additional sharded channels.

Signature

parameter channels Array(String)

The channels to subscribe to.

Implementation

def subscribe(channels)
	slots = @cluster_client.slots_for(channels)
	
	slots.each do |slot, channels_for_slot|
		if subscription = @subscriptions[slot]
			# Add to existing subscription for this shard
			subscription.ssubscribe(channels_for_slot)
		else
			# Create new subscription for this shard
			client = @cluster_client.client_for(slot)
			subscription = @subscriptions[slot] = client.ssubscribe(*channels_for_slot)
			
			@barrier.async do
				# This is optimistic, in other words, subscription.listen will also fail on close.
				until subscription.closed?
					@queue << subscription.listen
				end
			ensure
				# If we are exiting here for any reason OTHER than the subscription was closed, we need to re-create the subscription state:
				unless subscription.closed?
					@queue.close
				end
			end
		end
	end
	
	@channels.concat(channels)
end

def unsubscribe(channels)

Unsubscribe from sharded channels.

Signature

parameter channels Array(String)

The channels to unsubscribe from.

Implementation

def unsubscribe(channels)
	slots = @cluster_client.slots_for(channels)
	
	slots.each do |slot, channels_for_slot|
		if subscription = @subscriptions[slot]
			subscription.sunsubscribe(channels_for_slot)
			
			# Remove channels from our tracking
			@channels -= channels_for_slot
			
			# Check if this shard still has channels
			remaining_channels_for_slot = @channels.select {|ch| @cluster_client.slot_for(ch) == slot}
			
			# If no channels left for this shard, close and remove it
			if remaining_channels_for_slot.empty?
				@subscriptions.delete(slot)
				subscription.close
			end
		end
	end
end

def channels

Get the list of currently subscribed channels.

Signature

returns Array(String)

The list of subscribed channels.

Implementation

def channels
	@channels.dup
end

def shard_count

Get the number of active shard subscriptions.

Signature

returns Integer

The number of shard connections.

Implementation

def shard_count
	@subscriptions.size
end