Async::RedisGuidesTransactions and Pipelines

Transactions and Pipelines

This guide explains how to use Redis transactions and pipelines with async-redis for atomic operations and improved performance.

By default, each command (e.g. GET key) acquires a connection from the client, runs the command, returns the result and releases the connection back to the client's connection pool. This may be inefficient for some use cases, and so there are several ways to group together operations that run on the same connection.

Transactions (MULTI/EXEC)

Transactions ensure that multiple Redis commands execute atomically - either all commands succeed, or none of them do. This is crucial when you need to maintain data consistency, such as transferring money between accounts or updating related fields together.

Use transactions when you need:

  • Atomic updates: Multiple operations that must all succeed or all fail.
  • Data consistency: Keeping related data in sync across multiple keys.
  • Preventing partial updates: Avoiding situations where only some of your changes are applied.

Redis transactions queue commands and execute them all at once:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Execute commands atomically:
		results = client.transaction do |context|
			context.multi
			
			# Queue commands for atomic execution:
			context.set("user:1:name", "Alice")
			context.set("user:1:email", "alice@example.com")
			context.incr("user:count")
			
			# Execute all queued commands:
			context.execute
		end
		
		puts "Transaction results: #{results}"
		
	ensure
		client.close
	end
end

Watch/Unwatch for Optimistic Locking

When multiple clients might modify the same data simultaneously, you need to handle race conditions. Redis WATCH provides optimistic locking - the transaction only executes if watched keys haven't changed since you started watching them.

This is essential for scenarios like:

  • Updating counters or balances where the current value matters.
  • Implementing atomic increment operations with business logic.
  • Preventing lost updates in concurrent environments.

Here's how to implement safe concurrent updates:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Initialize counter.
		client.set("counter", 0)
		
		# Optimistic locking example:
		5.times do |i|
			success = false
			attempts = 0
			
			while !success && attempts < 3
				attempts += 1
				
				result = client.transaction do |context|
					# Watch the counter for changes (executes immediately):
					context.watch("counter")
					
					# Read current value (executes immediately):
					current_value = client.get("counter").to_i
					new_value = current_value + 1
					
					# Start transaction - commands after this are queued, not executed:
					context.multi
					
					# Queue commands (these return "QUEUED", don't execute yet):
					context.set("counter", new_value)
					context.set("last_update", Time.now.to_f)
					
					# Execute all queued commands atomically.
					# Returns nil if watched key was modified by another client:
					context.execute
				end
				
				if result
					puts "Increment #{i + 1} succeeded: #{result}"
					success = true
				else
					puts "Increment #{i + 1} failed, retrying (attempt #{attempts})"
					sleep 0.01
				end
			end
		end
		
		final_value = client.get("counter")
		puts "Final counter value: #{final_value}"
		
	ensure
		client.close
	end
end

Pipelines

When you need to execute many Redis commands quickly, sending them one-by-one creates network latency bottlenecks. Pipelines solve this by batching multiple commands together, dramatically reducing round-trip time.

Use pipelines when you need:

  • Better performance: Reduce network round trips for bulk operations.
  • High throughput: Process hundreds or thousands of commands efficiently.
  • Independent operations: Commands that don't depend on each other's results.

Unlike transactions, pipeline commands are not atomic - some may succeed while others fail:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		# Execute commands in a pipeline:
		results = client.pipeline do |context|
			# Commands are buffered and flushed if needed:
			context.set("pipeline:1", "value1")
			context.set("pipeline:2", "value2")
			context.set("pipeline:3", "value3")
			
			context.get("pipeline:1")
			context.get("pipeline:2")
			context.get("pipeline:3")
			
			# Flush and collect the results from all previous commands:
			context.collect
		end
		
		puts "Pipeline results: #{results}"
		
	ensure
		client.close
	end
end

Synchronous Pipeline Operations

When you need immediate results from individual commands within a pipeline, use context.sync:

require "async/redis"

endpoint = Async::Redis.local_endpoint
client = Async::Redis::Client.new(endpoint)

Async do
	begin
		client.pipeline do |context|
			# Set values using pipeline - no immediate response:
			context.set("async_key_1", "value1")
			context.set("async_key_2", "value2")
			
			# Get immediate response using sync:
			immediate_result = context.sync.get("async_key_1")
			puts "Immediate result: #{immediate_result}"
			
			# Continue with pipelined operations:
			context.get("async_key_2")
			
			# Collect remaining pipelined results:
			context.collect
		end
		
	ensure
		client.close
	end
end

Use context.sync when you need to:

  • Check values mid-pipeline: Verify data before continuing with more operations.
  • Conditional logic: Make decisions based on current Redis state.
  • Debugging: Get immediate feedback during pipeline development.

Note that sync operations execute immediately and flush pending responses, so use them strategically to maintain pipeline performance benefits.