class Connection
Represents a Redis protocol connection handling low-level communication.
Nested
Definitions
def initialize(stream)
Initialize a new connection with the provided stream.
Signature
-
parameter
stream
IO
The underlying stream for communication.
Implementation
def initialize(stream)
@stream = stream
# Number of requests sent:
@count = 0
end
attr :count
def close
Close the underlying stream connection.
Implementation
def close
@stream.close
end
def flush
Flush any buffered data to the stream.
Implementation
def flush
@stream.flush
end
def closed?
Check if the connection is closed.
Signature
-
returns
Boolean
True if the connection is closed.
Implementation
def closed?
@stream.closed?
end
def write_request(arguments)
The redis server doesn't want actual objects (e.g. integers) but only bulk strings. So, we inline it for performance.
Implementation
def write_request(arguments)
write_lines("*#{arguments.size}")
@count += 1
arguments.each do |argument|
string = argument.to_s
write_lines("$#{string.bytesize}", string)
end
end
def write_object(object)
Write a Redis object to the stream.
Signature
-
parameter
object
Object
The object to write (String, Array, Integer, or object with to_redis method).
Implementation
def write_object(object)
case object
when String
write_lines("$#{object.bytesize}", object)
when Array
write_array(object)
when Integer
write_lines(":#{object}")
when nil
write_lines("$-1")
else
write_object(object.to_redis)
end
end
def write_array(array)
Write a Redis array to the stream.
Signature
-
parameter
array
Array
The array to write.
Implementation
def write_array(array)
write_lines("*#{array.size}")
array.each do |element|
write_object(element)
end
end
def read_data(length)
Read data of specified length from the stream.
Signature
-
parameter
length
Integer
The number of bytes to read.
-
returns
String
The data read from the stream.
Implementation
def read_data(length)
buffer = @stream.read(length) or @stream.eof!
# Eat trailing whitespace because length does not include the CRLF:
@stream.read(2) or @stream.eof!
return buffer
end
def read_object
Read and parse a Redis object from the stream.
Signature
-
returns
Object
The parsed Redis object (String, Array, Integer, or nil).
-
raises
ServerError
If the server returns an error response.
-
raises
EOFError
If the stream reaches end of file.
Implementation
def read_object
line = read_line or raise EOFError
token = line.slice!(0, 1)
case token
when "$"
length = line.to_i
if length == -1
return nil
else
return read_data(length)
end
when "*"
count = line.to_i
# Null array (https://redis.io/topics/protocol#resp-arrays):
return nil if count == -1
array = Array.new(count) {read_object}
return array
when ":"
return line.to_i
when "-"
raise ServerError.new(line)
when "+"
return line
else
@stream.flush
raise UnknownTokenError, token.inspect
end
# TODO: If an exception (e.g. Async::TimeoutError) propagates out of this function, perhaps @stream should be closed? Otherwise it might be in a weird state.
end
def write_lines(*arguments)
In the case of Redis, we do not want to perform a flush in every line, because each Redis command contains several lines. Flushing once per command is more efficient because it avoids unnecessary writes to the socket.
Implementation
def write_lines(*arguments)
if arguments.empty?
@stream.write(CRLF)
else
arguments.each do |arg|
@stream.write(arg)
@stream.write(CRLF)
end
end
end