Async::RedisSourceAsyncRedisContextSubscribe

class Subscribe

Context for Redis pub/sub subscription operations.

Definitions

def initialize(pool, channels)

Initialize a new subscription context.

Signature

parameter pool Pool

The connection pool to use.

parameter channels Array(String)

The channels to subscribe to.

Implementation

def initialize(pool, channels)
	super(pool)
	
	subscribe(channels)
end

def close

Close the subscription context.

Implementation

def close
	# There is no way to reset subscription state. On Redis v6+ you can use RESET, but this is not supported in <= v6.
	@connection&.close
	
	super
end

def listen

Listen for the next message from subscribed channels.

Signature

returns Array

The next message response, or nil if connection closed.

Implementation

def listen
	while response = @connection.read_response
		return response if response.first == MESSAGE
	end
end

def each

Iterate over all messages from subscribed channels.

Signature

yields {|response| ...}

Block called for each message.

parameter response Array

The message response.

Implementation

def each
	return to_enum unless block_given?
	
	while response = self.listen
		yield response
	end
end

def subscribe(channels)

Subscribe to additional channels.

Signature

parameter channels Array(String)

The channels to subscribe to.

Implementation

def subscribe(channels)
	@connection.write_request ["SUBSCRIBE", *channels]
	@connection.flush
end

def unsubscribe(channels)

Unsubscribe from channels.

Signature

parameter channels Array(String)

The channels to unsubscribe from.

Implementation

def unsubscribe(channels)
	@connection.write_request ["UNSUBSCRIBE", *channels]
	@connection.flush
end