Async::RedisSourceAsyncRedisContextSubscription

class Subscription

Context for Redis pub/sub subscription operations.

Definitions

def initialize(pool, channels)

Initialize a new subscription context.

Signature

parameter pool Pool

The connection pool to use.

parameter channels Array(String)

The channels to subscribe to.

Implementation

def initialize(pool, channels)
	super(pool)
	
	subscribe(channels) if channels.any?
end

def close

Close the subscription context.

Implementation

def close
	# This causes anyone calling `#listen` to exit, as `read_response` will fail. If we decided to use `RESET` instead, we'd need to take that into account.
	@connection&.close
	
	super
end

def listen

Listen for the next message from subscribed channels.

Signature

returns Array

The next message response, or nil if connection closed.

Implementation

def listen
	while response = @connection.read_response
		type = response.first
		
		if type == MESSAGE || type == PMESSAGE || type == SMESSAGE
			return response
		end
	end
end

def each

Iterate over all messages from subscribed channels.

Signature

yields {|response| ...}

Block called for each message.

parameter response Array

The message response.

Implementation

def each
	return to_enum unless block_given?
	
	while response = self.listen
		yield response
	end
end

def subscribe(channels)

Subscribe to additional channels.

Signature

parameter channels Array(String)

The channels to subscribe to.

Implementation

def subscribe(channels)
	@connection.write_request ["SUBSCRIBE", *channels]
	@connection.flush
end

def unsubscribe(channels)

Unsubscribe from channels.

Signature

parameter channels Array(String)

The channels to unsubscribe from.

Implementation

def unsubscribe(channels)
	@connection.write_request ["UNSUBSCRIBE", *channels]
	@connection.flush
end

def psubscribe(patterns)

Subscribe to channel patterns.

Signature

parameter patterns Array(String)

The channel patterns to subscribe to.

Implementation

def psubscribe(patterns)
	@connection.write_request ["PSUBSCRIBE", *patterns]
	@connection.flush
end

def punsubscribe(patterns)

Unsubscribe from channel patterns.

Signature

parameter patterns Array(String)

The channel patterns to unsubscribe from.

Implementation

def punsubscribe(patterns)
	@connection.write_request ["PUNSUBSCRIBE", *patterns]
	@connection.flush
end

def ssubscribe(channels)

Subscribe to sharded channels (Redis 7.0+).

Signature

parameter channels Array(String)

The sharded channels to subscribe to.

Implementation

def ssubscribe(channels)
	@connection.write_request ["SSUBSCRIBE", *channels]
	@connection.flush
end

def sunsubscribe(channels)

Unsubscribe from sharded channels (Redis 7.0+).

Signature

parameter channels Array(String)

The sharded channels to unsubscribe from.

Implementation

def sunsubscribe(channels)
	@connection.write_request ["SUNSUBSCRIBE", *channels]
	@connection.flush
end