Async::RedisGuidesData Structures and Operations

Data Structures and Operations

This guide explains how to work with Redis data types and operations using async-redis.

Strings

Strings are Redis's most versatile data type, perfect for caching values, storing user sessions, implementing counters, or holding configuration data. Despite the name, they can store any binary data up to 512MB.

Common use cases:

  • Caching: Store computed results, API responses, or database query results.
  • Counters: Page views, user scores, rate limiting.
  • Sessions: User authentication tokens and session data.
  • Configuration: Application settings and feature flags.

Basic Operations

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Cache API responses:
		client.set("cache:api:users", '{"users": [{"id": 1, "name": "Alice"}]}')
		client.expire("cache:api:users", 300)  # 5 minute cache
		
		cached_response = client.get("cache:api:users")
		puts "Cached API response: #{cached_response}"
		
		# Implement counters:
		client.incr("stats:page_views")
		client.incrby("stats:api_calls", 5)
		
		page_views = client.get("stats:page_views")
		api_calls = client.get("stats:api_calls")
		puts "Page views: #{page_views}, API calls: #{api_calls}"
		
	ensure
		client.close
	end
end

Binary Data Handling

Redis strings can store any binary data, making them useful for caching images, files, or serialized objects:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Cache serialized data:
		user_data = { id: 123, name: "Alice", preferences: { theme: "dark" } }
		serialized = Marshal.dump(user_data)
		
		client.set("cache:user:123:full", serialized)
		client.expire("cache:user:123:full", 1800)  # 30 minutes
		
		# Retrieve and deserialize:
		cached_data = client.get("cache:user:123:full")
		if cached_data
			deserialized = Marshal.load(cached_data)
			puts "Cached user: #{deserialized}"
		end
		
	rescue => error
		puts "Cache error: #{error.message}"
	ensure
		client.close
	end
end

Expiration and TTL

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Set temporary authentication tokens:
		auth_token = SecureRandom.hex(32)
		client.setex("auth:token:#{auth_token}", 3600, "user:12345")
		
		# Cache with conditional setting:
		cache_key = "cache:expensive_computation"
		unless client.exists(cache_key)
			# Simulate expensive computation:
			result = "computed_result_#{rand(1000)}"
			client.setex(cache_key, 600, result)  # 10 minute cache
			puts "Computed and cached: #{result}"
		else
			cached_result = client.get(cache_key)
			puts "Using cached result: #{cached_result}"
		end
		
		# Check remaining TTL:
		ttl = client.ttl(cache_key)
		puts "Cache expires in #{ttl} seconds"
		
	ensure
		client.close
	end
end

Hashes

Hashes are ideal for caching structured data with multiple fields. They're more memory-efficient than using separate string keys for each field and provide atomic operations on individual fields, making them perfect for temporary storage and caching scenarios.

Perfect for:

  • Session data: Cache user session attributes and temporary state.
  • Row cache: Store frequently accessed database rows to reduce query load.
  • Request cache: Cache computed results from expensive operations or API calls.
  • User preferences: Store frequently accessed settings that change occasionally.

Field Operations

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Store session data with individual fields:
		session_id = "session:" + SecureRandom.hex(16)
		client.hset(session_id, "user_id", "12345")
		client.hset(session_id, "username", "alice")
		client.hset(session_id, "roles", "admin,editor")
		client.hset(session_id, "last_activity", Time.now.to_i)
		
		# Set session expiration:
		client.expire(session_id, 3600)  # 1 hour
		
		# Retrieve session fields:
		user_id = client.hget(session_id, "user_id")
		username = client.hget(session_id, "username")
		roles = client.hget(session_id, "roles").split(",")
		
		puts "Session for user #{username} (ID: #{user_id}), roles: #{roles.join(', ')}"
		
		# Check if user has specific role:
		has_admin = client.hexists(session_id, "roles") && 
													client.hget(session_id, "roles").include?("admin")
		puts "Has admin role: #{has_admin}"
		
		# Update last activity timestamp:
		client.hset(session_id, "last_activity", Time.now.to_i)
		
	ensure
		client.close
	end
end

Bulk Operations

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Cache database row with multiple fields:
		user_cache_key = "cache:user:456"
		user_data = {
			"name" => "Bob",
			"email" => "bob@example.com", 
			"last_login" => Time.now.to_i.to_s,
			"status" => "active"
		}
		
		# Set all fields at once:
		client.hmset(user_cache_key, *user_data.to_a.flatten)
		client.expire(user_cache_key, 1800)  # 30 minutes
		
		# Get specific fields:
		name, email = client.hmget(user_cache_key, "name", "email")
		puts "User: #{name} (#{email})"
		
		# Get all cached data:
		all_data = client.hgetall(user_cache_key)
		puts "Full user cache: #{all_data}"
		
		# Get field count:
		field_count = client.hlen(user_cache_key)
		puts "Cached #{field_count} user fields"
		
	ensure
		client.close
	end
end

Lists

Lists maintain insertion order and allow duplicates, making them perfect for implementing queues, activity feeds, or recent item lists. They support efficient operations at both ends.

Essential for:

  • Task queues: Background job processing with FIFO or LIFO behavior.
  • Activity feeds: Recent user actions or timeline events.
  • Message queues: Communication between application components.
  • Recent items: Keep track of recently viewed or accessed items.

Queue Operations

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Producer: Add tasks to queue:
		tasks = ["send_email:123", "process_payment:456", "generate_report:789"]
		tasks.each do |task|
			client.lpush("task_queue", task)
			puts "Queued: #{task}"
		end
		
		# Consumer: Process tasks from queue:
		while client.llen("task_queue") > 0
			task = client.rpop("task_queue")
			puts "Processing: #{task}"
			
			# Simulate task processing:
			sleep 0.1
			puts "Completed: #{task}"
		end
		
	ensure
		client.close
	end
end

Recent Items List

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		user_id = "123"
		recent_key = "recent:viewed:#{user_id}"
		
		# User views different pages:
		pages = ["/products/1", "/products/5", "/cart", "/products/1", "/checkout"]
		
		pages.each do |page|
			# Remove if already exists to avoid duplicates:
			client.lrem(recent_key, 0, page)
			
			# Add to front of list:
			client.lpush(recent_key, page)
			
			# Keep only last 5 items:
			client.ltrim(recent_key, 0, 4)
		end
		
		# Get recent items:
		recent_pages = client.lrange(recent_key, 0, -1)
		puts "Recently viewed: #{recent_pages}"
		
		# Set expiration for cleanup:
		client.expire(recent_key, 86400)  # 24 hours
		
	ensure
		client.close
	end
end

Blocking Operations

Blocking operations let consumers wait for new items instead of constantly polling, making them perfect for real-time job processing:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do |task|
	begin
		# Producer task:
		producer = task.async do
			5.times do |i|
				sleep 1
				job_data = { id: i, action: "process_user_#{i}" }.to_json
				client.lpush("job_queue", job_data)
				puts "Produced job #{i}"
			end
		end
		
		# Consumer task with blocking pop:
		consumer = task.async do
			5.times do
				# Block for up to 2 seconds waiting for work:
				result = client.brpop("job_queue", 2)
				
				if result
					queue_name, job_json = result
					job = JSON.parse(job_json)
					puts "Processing job #{job['id']}: #{job['action']}"
					sleep 0.5  # Simulate work
				else
					puts "No work available, continuing..."
				end
			end
		end
		
		# Wait for both tasks to complete:
		producer.wait
		consumer.wait
		
	ensure
		client.close
	end
end

Sets and Sorted Sets

Sets automatically handle uniqueness and provide fast membership testing, while sorted sets add scoring for rankings and range queries.

Sets are perfect for:

  • Tags and categories: User interests, product categories.
  • Unique visitors: Track unique users without duplicates.
  • Permissions: User roles and access rights.
  • Cache invalidation: Track which cache keys need updating.

Sorted sets excel at:

  • Leaderboards: Game scores, user rankings.
  • Time-based data: Recent events, scheduled tasks.
  • Priority queues: Tasks with different priorities.
  • Range queries: Find items within score ranges.

Set Operations

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Track unique daily visitors:
		today = Date.today.to_s
		visitor_key = "visitors:#{today}"
		
		# Add visitors (duplicates automatically ignored):
		visitors = ["user:123", "user:456", "user:123", "user:789", "user:456"]
		visitors.each do |visitor|
			client.sadd(visitor_key, visitor)
		end
		
		# Get unique visitor count:
		unique_count = client.scard(visitor_key)
		puts "Unique visitors today: #{unique_count}"
		
		# Check if specific user visited:
		visited = client.sismember(visitor_key, "user:123")
		puts "User 123 visited today: #{visited}"
		
		# Get all unique visitors:
		all_visitors = client.smembers(visitor_key)
		puts "All visitors: #{all_visitors}"
		
		# Set expiration for daily cleanup:
		client.expire(visitor_key, 86400)  # 24 hours
		
	ensure
		client.close
	end
end

Scoring and Ranking

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Track user activity scores:
		leaderboard_key = "leaderboard:weekly"
		
		# Add user scores:
		client.zadd(leaderboard_key, 150, "user:alice")
		client.zadd(leaderboard_key, 200, "user:bob") 
		client.zadd(leaderboard_key, 175, "user:charlie")
		client.zadd(leaderboard_key, 125, "user:david")
		
		# Get top 3 users:
		top_users = client.zrevrange(leaderboard_key, 0, 2, with_scores: true)
		puts "Top 3 users this week:"
		top_users.each_slice(2).with_index do |(user, score), index|
			puts "  #{index + 1}. #{user}: #{score.to_i} points"
		end
		
		# Get user's rank and score:
		alice_rank = client.zrevrank(leaderboard_key, "user:alice")
		alice_score = client.zscore(leaderboard_key, "user:alice")
		puts "Alice: rank #{alice_rank + 1}, score #{alice_score.to_i}"
		
		# Update scores:
		client.zincrby(leaderboard_key, 25, "user:alice")
		new_score = client.zscore(leaderboard_key, "user:alice")
		puts "Alice's updated score: #{new_score.to_i}"
		
		# Set weekly expiration:
		client.expire(leaderboard_key, 604800)  # 7 days
		
	ensure
		client.close
	end
end

Time-Based Range Queries

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Track user activity with timestamps:
		activity_key = "activity:user:123"
		
		# Add timestamped activities:
		activities = [
			{ action: "login", time: Time.now.to_f - 3600 },
			{ action: "view_page", time: Time.now.to_f - 1800 },
			{ action: "purchase", time: Time.now.to_f - 900 },
			{ action: "logout", time: Time.now.to_f - 300 }
		]
		
		activities.each do |activity|
			client.zadd(activity_key, activity[:time], activity[:action])
		end
		
		# Get activities from last hour:
		one_hour_ago = Time.now.to_f - 3600
		recent_activities = client.zrangebyscore(activity_key, one_hour_ago, "+inf")
		puts "Recent activities: #{recent_activities}"
		
		# Count activities in time range:
		thirty_min_ago = Time.now.to_f - 1800
		recent_count = client.zcount(activity_key, thirty_min_ago, "+inf")
		puts "Activities in last 30 minutes: #{recent_count}"
		
		# Clean up old activities:
		two_hours_ago = Time.now.to_f - 7200
		removed = client.zremrangebyscore(activity_key, "-inf", two_hours_ago)
		puts "Removed #{removed} old activities"
		
	ensure
		client.close
	end
end