Build Your Own Redis: ECHO [4/4]

This is the fourth article in a series where we’ll build a toy Redis clone in Ruby. If you’d like to code-along, try the Build your own Redis challenge!

Previous article: Concurrent Clients

Sections in this article:

The ECHO command

The ECHO command does exactly what it says:

➜  rohitpaulk.com git:(master) ✗ redis-cli
127.0.0.1:6379> echo hey
"hey"
127.0.0.1:6379> echo hello
"hello"

Since this commands involves user input, we’ll need to be able to decode RESP first.

Integrated Test

Like we did in the previous article, let’s start out with an integrated test.

require "redis"
require "minitest/autorun"
# 6379 for official redis, 6380 for ours
SERVER_PORT = ENV["SERVER_PORT"]
class TestRedisServer < Minitest::Test
def test_responds_to_ping
r = Redis.new(port: SERVER_PORT)
assert_equal "PONG", r.ping
end
def test_multiple_commands_from_same_client
r = Redis.new(port: SERVER_PORT)
# The Redis client re-connects on timeout by default, without_reconnect
# prevents that.
r.without_reconnect do
assert_equal "PONG", r.ping
assert_equal "PONG", r.ping
end
end
def test_multiple_clients
r1 = Redis.new(port: SERVER_PORT)
r2 = Redis.new(port: SERVER_PORT)
assert_equal "PONG", r1.ping
assert_equal "PONG", r2.ping
end
+
+ def test_responds_to_echo
+ r = Redis.new(port: SERVER_PORT)
+ assert_equal "hey", r.echo("hey")
+ assert_equal "hello", r.echo("hello")
+ end
end

When run against our version of redis, this test fails:

➜  SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 31312

# Running:

..F.

  1) Failure:
TestRedisServer#test_responds_to_echo [_files/redis/integrated_test_4.rb:34]:
Expected: "hey"
  Actual: "PONG"

4 runs, 6 assertions, 1 failures, 0 errors, 0 skips

This is expected. In the second article of this series we returned PONG as a reply for every command, remember?

Decoding RESP

In the last article, we looked at how RESP works. Now that we need to decode RESP in our program, let’s implement a RESPDecoder.

For a command like ECHO hey, here’s what the client will send us:

*2\r\n$4\r\nECHO\r\n$3\r\nhey\r\n

Quick recap of what this means:

That’s 23 (1 + 1 + 2 + 10 + 9) bytes in total.

Since we’re using TCP, there’s no guarantee that all these bytes will arrive at the same time. Maybe we’ll receive *2\r\n first, and $4\r\nECHO\r\n$3\r\nhey\r\n later?

In an event loop, we don’t have the luxury of waiting till the client sends us complete RESP. We’ll have to maintain a buffer, and design a RESP decoder that is capable of identifying incomplete input.

Let’s now start out with writing our RESP decoder. It’ll support decoding Simple String, Bulk String and Arrays. If it encounters an incomplete RESP string, it’ll throw an IncompleteRESP exception.

Note: What follows is a LOT of code. Feel free to skip over to the implementation of ECHO if you aren’t interested in the nitty-gritty of parsing RESP.

Simple Strings

Starting out with a basic spec for decoding simple strings:

require "minitest/autorun"

class TestRESPDecoder < Minitest::Test
  def test_simple_string
    assert_equal "OK", RESPDecoder.decode("+OK\r\n")
    assert_equal "HEY", RESPDecoder.decode("+HEY\r\n")
    assert_raises(IncompleteRESP) { RESPDecoder.decode("+") }
    assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") }
    assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") }
  end
end

Now the implementation:

class IncompleteRESP < Exception; end

class RESPDecoder
  def self.decode(resp_str)
    resp_io = StringIO.new(resp_str)
    first_char = resp_io.read(1)
    if first_char == "+"
      self.decode_simple_string(resp_io)
    else
      raise RuntimeError.new("Unhandled first_char: #{first_char}")
    end
  rescue EOFError
    raise IncompleteRESP
  end

  def self.decode_simple_string(resp_io)
    read = resp_io.readline(sep = "\r\n")
    if read[-2..-1] != "\r\n"
      raise IncompleteRESP
    end

    read[0..-3]
  end
end

Bulk Strings

Let’s amend our test suite to handle bulk strings:

require "minitest/autorun"
class TestRESPDecoder < Minitest::Test
def test_simple_string
assert_equal "OK", RESPDecoder.decode("+OK\r\n")
assert_equal "HEY", RESPDecoder.decode("+HEY\r\n")
assert_raises(IncompleteRESP) { RESPDecoder.decode("+") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") }
end
+
+ def test_bulk_string
+ assert_equal "OK", RESPDecoder.decode("$2\r\nOK\r\n")
+ assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")
+ assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\n") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK\r") }
+ end
end

Now the implementation:

class IncompleteRESP < Exception; end
class RESPDecoder
def self.decode(resp_str)
resp_io = StringIO.new(resp_str)
first_char = resp_io.read(1)
if first_char == "+"
self.decode_simple_string(resp_io)
+ elsif first_char == "$"
+ self.decode_bulk_string(resp_io)
else
raise RuntimeError.new("Unhandled first_char: #{first_char}")
end
rescue EOFError
raise IncompleteRESP
end
def self.decode_simple_string(resp_io)
read = resp_io.readline(sep = "\r\n")
if read[-2..-1] != "\r\n"
raise IncompleteRESP
end
read[0..-3]
end
+
+ def self.decode_bulk_string(resp_io)
+ byte_count_with_clrf = resp_io.readline(sep = "\r\n")
+ if byte_count_with_clrf[-2..-1] != "\r\n"
+ raise IncompleteRESP
+ end
+ byte_count = byte_count_with_clrf.to_i
+ str = resp_io.read(byte_count)
+
+ # Exactly the advertised number of bytes must be present
+ raise IncompleteRESP unless str && str.length == byte_count
+
+ # Consume the ending CLRF
+ raise IncompleteRESP unless resp_io.read(2) == "\r\n"
+
+ str
+ end
end

Arrays

And now for the last data type: Arrays.

require "minitest/autorun"
class TestRESPDecoder < Minitest::Test
def test_simple_string
assert_equal "OK", RESPDecoder.decode("+OK\r\n")
assert_equal "HEY", RESPDecoder.decode("+HEY\r\n")
assert_raises(IncompleteRESP) { RESPDecoder.decode("+") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") }
end
def test_bulk_string
assert_equal "OK", RESPDecoder.decode("$2\r\nOK\r\n")
assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")
assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")
assert_raises(IncompleteRESP) { RESPDecoder.decode("$") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("$2") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\n") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK\r") }
end
+
+ def test_arrays
+ assert_equal ["PING"], RESPDecoder.decode("*1\r\n$4\r\nPING\r\n")
+ assert_equal ["ECHO", "hey"], RESPDecoder.decode("*2\r\n$4\r\nECHO\r\n$3\r\nhey\r\n")
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1\r\n") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1\r\n$4") }
+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*2\r\n$4\r\nECHO\r\n") }
+ end
end

Since arrays can contain anything inside them (simple strings, bulk strings, or other arrays themselves), we’ll try to re-use the existing code for parsing simple strings & bulk strings.

class IncompleteRESP < Exception; end
class RESPDecoder
def self.decode(resp_str)
resp_io = StringIO.new(resp_str)
+ self.do_decode(resp_io)
+ end
+
+ def self.do_decode(resp_io)
first_char = resp_io.read(1)
+ raise IncompleteRESP if first_char.nil?
+
if first_char == "+"
self.decode_simple_string(resp_io)
elsif first_char == "$"
self.decode_bulk_string(resp_io)
+ elsif first_char == "*"
+ self.decode_array(resp_io)
else
raise RuntimeError.new("Unhandled first_char: #{first_char}")
end
rescue EOFError
raise IncompleteRESP
end
def self.decode_simple_string(resp_io)
read = resp_io.readline(sep = "\r\n")
if read[-2..-1] != "\r\n"
raise IncompleteRESP
end
read[0..-3]
end
def self.decode_bulk_string(resp_io)
- byte_count_with_clrf = resp_io.readline(sep = "\r\n")
- if byte_count_with_clrf[-2..-1] != "\r\n"
- raise IncompleteRESP
- end
- byte_count = byte_count_with_clrf.to_i
+ byte_count = read_int_with_clrf(resp_io)
str = resp_io.read(byte_count)
# Exactly the advertised number of bytes must be present
raise IncompleteRESP unless str && str.length == byte_count
# Consume the ending CLRF
raise IncompleteRESP unless resp_io.read(2) == "\r\n"
str
end
+
+ def self.decode_array(resp_io)
+ element_count = read_int_with_clrf(resp_io)
+
+ # Recurse, using do_decode
+ element_count.times.map { self.do_decode(resp_io) }
+ end
+
+ def self.read_int_with_clrf(resp_io)
+ int_with_clrf = resp_io.readline(sep = "\r\n")
+ if int_with_clrf[-2..-1] != "\r\n"
+ raise IncompleteRESP
+ end
+ int_with_clrf.to_i
+ end
end

Implementing ECHO

Now that we’ve got a RESP decoder in place, let’s get back to the original task: Implementing the ECHO command.

Refactor: Extract out a Client class

Since we know we’ll need to read small chunks and maintain a buffer, let’s start with refactoring out a Client class that holds that state.

require "socket"
+ class Client
+ attr_reader :socket
+
+ def initialize(socket)
+ @socket = socket
+ @buffer = ""
+ end
+
+ def read_available
+ @buffer += @socket.readpartial(1024)
+ end
+
+ def write(msg)
+ @socket.write(msg)
+ end
+ end
+
class RedisServer
def initialize(port)
@server = TCPServer.new(port)
- @clients = []
+ @sockets_to_clients = {}
end
def listen
loop do
- fds_to_watch = [@server, *@clients]
+ fds_to_watch = [@server, *@sockets_to_clients.keys]
ready_to_read, _, _ = IO.select(fds_to_watch)
ready_to_read.each do |ready|
if ready == @server
- @clients << @server.accept
+ client_socket = @server.accept
+ @sockets_to_clients[client_socket] = Client.new(client_socket)
else
# If not the server, this must be one of the existing clients
+ client = @sockets_to_clients[ready]
handle_client(client)
end
end
end
end
def handle_client(client)
- client.readpartial(1024) # TODO: Read actual command
+ client.read_available # TODO: Read actual command
# TODO: Handle commands other than PING
client.write("+PONG\r\n")
+ rescue Errno::ECONNRESET, EOFError
+ # If the client has disconnected, let's
+ # remove from our list of active clients
+ @sockets_to_clients.delete(client.socket)
end
end

Running our test suite to make sure the refactor worked:

➜  SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 4190

# Running:

..F.

  1) Failure:
TestRedisServer#test_responds_to_echo [_files/redis/integrated_test_4.rb:34]:
Expected: "hey"
  Actual: "PONG"

4 runs, 6 assertions, 1 failures, 0 errors, 0 skips

The failure is expected, let’s fix that next by actually parsing commands.

Parsing commands using RESPDecoder

We’ll use the RESPDecoder we built to try decoding the buffer in a client.

require "socket"
+ require_relative "resp_decoder_3"
+
+ class Command
+ attr_reader :action
+ attr_reader :args
+
+ def initialize(action, args)
+ @action = action
+ @args = args
+ end
+ end
class Client
attr_reader :socket
def initialize(socket)
@socket = socket
@buffer = ""
end
def read_available
@buffer += @socket.readpartial(1024)
end
+ def consume_command!
+ array = RESPDecoder.decode(@buffer)
+ @buffer = "" # Reset buffer after consuming command
+ Command.new(array.first, array[1..-1])
+ rescue IncompleteRESP
+ # The client hasn't sent us a complete command yet
+ return nil
+ end
+
def write(msg)
@socket.write(msg)
end
end
class RedisServer
def initialize(port)
@server = TCPServer.new(port)
@sockets_to_clients = {}
end
def listen
loop do
fds_to_watch = [@server, *@sockets_to_clients.keys]
ready_to_read, _, _ = IO.select(fds_to_watch)
ready_to_read.each do |ready|
if ready == @server
client_socket = @server.accept
@sockets_to_clients[client_socket] = Client.new(client_socket)
else
# If not the server, this must be one of the existing clients
client = @sockets_to_clients[ready]
handle_client(client)
end
end
end
end
def handle_client(client)
- client.read_available # TODO: Read actual command
-
- # TODO: Handle commands other than PING
- client.write("+PONG\r\n")
+ client.read_available
+ loop do
+ command = client.consume_command!
+ break unless command
+ handle_command(client, command)
+ end
rescue Errno::ECONNRESET, EOFError
# If the client has disconnected, let's
# remove from our list of active clients
@sockets_to_clients.delete(client.socket)
end
+
+ def handle_command(client, command)
+ if command.action.downcase == "ping"
+ client.write("+PONG\r\n")
+ elsif command.action.downcase == "echo"
+ client.write("+#{command.args.first}\r\n")
+ else
+ raise RuntimeError.new("Unhandled command: #{command.action}")
+ end
+ end
end

Each time we read from the client, we try to decode contents the buffer. If we run into IncompleteRESP, we’ll skip the action and get back to the event loop.

Tests pass!

➜  SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 17051

# Running:

....

4 runs, 7 assertions, 0 failures, 0 errors, 0 skips

Our Redis server is now capable of serving multiple clients with the PING and ECHO commands.


The next article is in progress, it’ll be about implementing the GET & SET commands. If you’d like to get access to a draft version, please let me know.