Protocol::RedisSourceProtocolRedisClusterMethodsStreams

module Streams

Provides Redis Streams commands for cluster environments. Stream operations are routed to the appropriate shard based on the stream key.

Definitions

def xinfo(*arguments, role: :master)

Get information on streams and consumer groups.

Signature

parameter arguments Array

XINFO command arguments (subcommand and stream key).

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Stream information.

Implementation

def xinfo(*arguments, role: :master)
	# Extract stream key (usually the second argument after subcommand):
	stream_key = arguments[1] if arguments.length > 1
	
	if stream_key
		slot = slot_for(stream_key)
		client = client_for(slot, role)
	else
		# Fallback for commands without a specific key:
		client = any_client(role)
	end
	
	return client.call("XINFO", *arguments)
end

def xadd(key, *arguments, role: :master)

Append a new entry to a stream.

Signature

parameter key String

The stream key.

parameter arguments Array

Additional XADD arguments (ID and field-value pairs).

parameter role Symbol

The role of node to use (:master or :slave).

returns String

The ID of the added entry.

Implementation

def xadd(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XADD", key, *arguments)
end

def xtrim(key, *arguments, role: :master)

Trim the stream to a certain size.

Signature

parameter key String

The stream key.

parameter arguments Array

Trim strategy and parameters.

parameter role Symbol

The role of node to use (:master or :slave).

returns Integer

Number of entries removed.

Implementation

def xtrim(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XTRIM", key, *arguments)
end

def xdel(key, *arguments, role: :master)

Remove specified entries from the stream.

Signature

parameter key String

The stream key.

parameter arguments Array

Entry IDs to remove.

parameter role Symbol

The role of node to use (:master or :slave).

returns Integer

Number of entries actually deleted.

Implementation

def xdel(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XDEL", key, *arguments)
end

def xrange(key, *arguments, role: :master)

Return a range of elements in a stream.

Signature

parameter key String

The stream key.

parameter arguments Array

Range parameters (start, end, optional COUNT).

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Stream entries in the specified range.

Implementation

def xrange(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XRANGE", key, *arguments)
end

def xrevrange(key, *arguments, role: :master)

Return a range of elements in a stream in reverse order.

Signature

parameter key String

The stream key.

parameter arguments Array

Range parameters (end, start, optional COUNT).

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Stream entries in reverse order.

Implementation

def xrevrange(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XREVRANGE", key, *arguments)
end

def xlen(key, role: :master)

Return the number of entries in a stream.

Signature

parameter key String

The stream key.

parameter role Symbol

The role of node to use (:master or :slave).

returns Integer

Number of entries in the stream.

Implementation

def xlen(key, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XLEN", key)
end

def xread(*arguments, role: :master)

Read new entries from multiple streams. Note: In cluster mode, all streams in a single XREAD must be on the same shard.

Signature

parameter arguments Array

XREAD arguments including STREAMS keyword and stream keys/IDs.

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

New entries from the specified streams.

Implementation

def xread(*arguments, role: :master)
	# Extract first stream key to determine shard:
	streams_index = arguments.index("STREAMS")
	
	if streams_index && streams_index + 1 < arguments.length
		first_stream_key = arguments[streams_index + 1]
		slot = slot_for(first_stream_key)
		client = client_for(slot, role)
	else
		# Fallback if STREAMS keyword not found:
		client = any_client(role)
	end
	
	return client.call("XREAD", *arguments)
end

def xgroup(*arguments, role: :master)

Create, destroy, and manage consumer groups.

Signature

parameter arguments Array

XGROUP command arguments.

parameter role Symbol

The role of node to use (:master or :slave).

returns String | Integer

Command result.

Implementation

def xgroup(*arguments, role: :master)
	# Extract stream key (usually third argument for CREATE, second for others):
	stream_key = case arguments[0]&.upcase
														when "CREATE", "SETID"
															arguments[1] # CREATE stream group id, SETID stream group id
														when "DESTROY", "DELCONSUMER"
															arguments[1] # DESTROY stream group, DELCONSUMER stream group consumer
														else
															arguments[1] if arguments.length > 1
	end
	
	if stream_key
		slot = slot_for(stream_key)
		client = client_for(slot, role)
	else
		client = any_client(role)
	end
	
	return client.call("XGROUP", *arguments)
end

def xreadgroup(*arguments, role: :master)

Read new entries from streams using a consumer group.

Signature

parameter arguments Array

XREADGROUP arguments.

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Entries for the consumer group.

Implementation

def xreadgroup(*arguments, role: :master)
	# Extract first stream key to determine shard:
	streams_index = arguments.index("STREAMS")
	
	if streams_index && streams_index + 1 < arguments.length
		first_stream_key = arguments[streams_index + 1]
		slot = slot_for(first_stream_key)
		client = client_for(slot, role)
	else
		client = any_client(role)
	end
	
	return client.call("XREADGROUP", *arguments)
end

def xack(key, *arguments, role: :master)

Acknowledge processed messages in a consumer group.

Signature

parameter key String

The stream key.

parameter arguments Array

Group name and message IDs.

parameter role Symbol

The role of node to use (:master or :slave).

returns Integer

Number of messages acknowledged.

Implementation

def xack(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XACK", key, *arguments)
end

def xclaim(key, *arguments, role: :master)

Change ownership of messages in a consumer group.

Signature

parameter key String

The stream key.

parameter arguments Array

Group, consumer, min-idle-time, and message IDs.

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Claimed messages.

Implementation

def xclaim(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XCLAIM", key, *arguments)
end

def xpending(key, *arguments, role: :master)

Get information about pending messages in a consumer group.

Signature

parameter key String

The stream key.

parameter arguments Array

Group name and optional consumer/range parameters.

parameter role Symbol

The role of node to use (:master or :slave).

returns Array

Pending message information.

Implementation

def xpending(key, *arguments, role: :master)
	slot = slot_for(key)
	client = client_for(slot, role)
	
	return client.call("XPENDING", key, *arguments)
end