Async::RedisGuidesStreams

Streams

This guide explains how to use Redis streams with async-redis for reliable message processing and event sourcing.

Streams are designed for high-throughput message processing and event sourcing. They provide durability, consumer groups for load balancing, and automatic message acknowledgment.

Use streams when you need:

Stream Creation and Consumption

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Add entries to stream:
		events = [
			{ "type" => "user_signup", "user_id" => "123", "email" => "alice@example.com" },
			{ "type" => "purchase", "user_id" => "123", "amount" => "29.99" },
			{ "type" => "user_signup", "user_id" => "456", "email" => "bob@example.com" }
		]
		
		events.each do |event|
			entry_id = client.xadd("user_events", "*", event)
			puts "Added event with ID: #{entry_id}"
		end
		
		# Read from stream:
		entries = client.xrange("user_events", "-", "+")
		puts "Stream entries:"
		entries.each do |entry_id, fields|
			puts "  #{entry_id}: #{fields}"
		end
		
		# Read latest entries:
		latest = client.xrevrange("user_events", "+", "-", count: 2)
		puts "Latest 2 entries: #{latest}"
		
	ensure
		client.close
	end
end

Reading New Messages

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Add some initial events:
		client.xadd("notifications", "*", "type" => "welcome", "user_id" => "123")
		client.xadd("notifications", "*", "type" => "reminder", "user_id" => "456")
		
		# Read only new messages (blocking):
		puts "Waiting for new messages..."
		
		# This will block until new messages arrive:
		messages = client.xread("BLOCK", 5000, "STREAMS", "notifications", "$")
		
		if messages && !messages.empty?
			stream_name, entries = messages[0]
			puts "Received #{entries.length} new messages:"
			entries.each do |entry_id, fields|
				puts "  #{entry_id}: #{fields}"
			end
		else
			puts "No new messages received within timeout"
		end
		
	ensure
		client.close
	end
end

Consumer Groups

Consumer groups enable multiple workers to process messages in parallel while ensuring each message is processed exactly once:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Create consumer group:
		begin
			client.xgroup("CREATE", "user_events", "processors", "0", "MKSTREAM")
			puts "Created consumer group 'processors'"
		rescue Protocol::Redis::ServerError => e
			puts "Consumer group already exists: #{e.message}"
		end
		
		# Add some test events:
		3.times do |i|
			client.xadd("user_events", "*", "event" => "test_#{i}", "timestamp" => Time.now.to_f)
		end
		
		# Consume messages:
		consumer_name = "worker_1"
		messages = client.xreadgroup("GROUP", "processors", consumer_name, "COUNT", 2, "STREAMS", "user_events", ">")
		
		if messages && !messages.empty?
			stream_name, entries = messages[0]
			puts "Consumer #{consumer_name} received #{entries.length} messages:"
			
			entries.each do |entry_id, fields|
				puts "  Processing #{entry_id}: #{fields}"
				
				# Simulate message processing:
				sleep 0.1
				
				# Acknowledge message processing:
				client.xack("user_events", "processors", entry_id)
				puts "  Acknowledged #{entry_id}"
			end
		else
			puts "No new messages for consumer #{consumer_name}"
		end
		
	ensure
		client.close
	end
end

Multiple Consumers

Demonstrate load balancing across multiple consumers:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do |task|
	begin
		# Create consumer group:
		begin
			client.xgroup("CREATE", "work_queue", "workers", "0", "MKSTREAM")
		rescue Protocol::Redis::ServerError
			# Group already exists
		end
		
		# Producer task - add work items:
		producer = task.async do
			10.times do |i|
				client.xadd("work_queue", "*", 
					"task_id" => i, 
					"data" => "work_item_#{i}",
					"priority" => rand(1..5)
				)
				puts "Added work item #{i}"
				sleep 0.5
			end
		end
		
		# Consumer tasks - process work items:
		consumers = 3.times.map do |worker_id|
			task.async do
				consumer_name = "worker_#{worker_id}"
				
				loop do
					messages = client.xreadgroup(
						"GROUP", "workers", consumer_name,
						"COUNT", 1,
						"BLOCK", 1000,
						"STREAMS", "work_queue", ">"
					)
					
					if messages && !messages.empty?
						stream_name, entries = messages[0]
						
						entries.each do |entry_id, fields|
							puts "#{consumer_name} processing: #{fields}"
							
							# Simulate work:
							sleep rand(0.1..0.5)
							
							# Acknowledge completion:
							client.xack("work_queue", "workers", entry_id)
							puts "#{consumer_name} completed: #{entry_id}"
						end
					end
				end
			end
		end
		
		# Wait for producer to finish:
		producer.wait
		
		# Let consumers process remaining work:
		sleep 3
		
		# Stop all consumers:
		consumers.each(&:stop)
		
	ensure
		client.close
	end
end

Message Acknowledgment and Recovery

Handle message acknowledgment and recover from failures:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Check for pending messages:
		pending_info = client.xpending("user_events", "processors")
		if pending_info && pending_info[0] > 0
			puts "Found #{pending_info[0]} pending messages"
			
			# Get detailed pending information:
			pending_details = client.xpending("user_events", "processors", "-", "+", 10)
			pending_details.each do |entry_id, consumer, idle_time, delivery_count|
				puts "Message #{entry_id} pending for #{idle_time}ms (delivered #{delivery_count} times to #{consumer})"
				
				# Claim long-pending messages for reprocessing:
				if idle_time > 60000  # 1 minute
					claimed = client.xclaim("user_events", "processors", "recovery_worker", 60000, entry_id)
					if claimed && !claimed.empty?
						puts "Claimed message #{entry_id} for reprocessing"
						
						# Process the claimed message:
						claimed.each do |claimed_id, fields|
							puts "Reprocessing: #{fields}"
							# ... process message ...
						end
						
						# Acknowledge after successful processing:
						client.xack("user_events", "processors", entry_id)
						puts "Acknowledged recovered message #{entry_id}"
					end
				end
			end
		else
			puts "No pending messages found"
		end
		
	ensure
		client.close
	end
end

Stream Information and Management

Monitor and manage stream health:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Get stream information:
		stream_info = client.xinfo("STREAM", "user_events")
		puts "Stream info: #{stream_info}"
		
		# Get consumer group information:
		begin
			groups_info = client.xinfo("GROUPS", "user_events")
			puts "Consumer groups:"
			groups_info.each do |group|
				group_data = Hash[*group]
				puts "  Group: #{group_data['name']}, Consumers: #{group_data['consumers']}, Pending: #{group_data['pending']}"
			end
		rescue Protocol::Redis::ServerError
			puts "No consumer groups exist for this stream"
		end
		
		# Get consumers in a group:
		begin
			consumers_info = client.xinfo("CONSUMERS", "user_events", "processors")
			puts "Consumers in 'processors' group:"
			consumers_info.each do |consumer|
				consumer_data = Hash[*consumer]
				puts "  Consumer: #{consumer_data['name']}, Pending: #{consumer_data['pending']}, Idle: #{consumer_data['idle']}ms"
			end
		rescue Protocol::Redis::ServerError
			puts "Consumer group 'processors' does not exist"
		end
		
		# Trim stream to keep only recent messages:
		trimmed = client.xtrim("user_events", "MAXLEN", "~", 1000)
		puts "Trimmed #{trimmed} messages from stream"
		
	ensure
		client.close
	end
end