Pyfisch’s Website > Blog

A Tokio Echo Server in 35 Lines

written by Pyfisch on

I’ve decided to give tokio the new “network application framework” based on mio a try and write an echo server.

While tokio is a rust crate, in German language Tokio is the city of Tokyo in Japan. By Martin Falbisoner CC BY-SA 3.0, via Wikimedia Commons

RFC 862 is just one page and the section describing the TCP echo server is just four lines long, so I paste it here in its entirety:

One echo service is defined as a connection based application on TCP. A server listens for TCP connections on TCP port 7. Once a connection is established any data received is sent back. This continues until the calling user terminates the connection.

The main function first creates a Reactor. The reactor manages the connections with the clients. The server should listen to all connections on port 7000 (port 7 requires root, so I’ve changed it). Echo::from_stream constructs a new task for a TCP stream. It is called by the reactor for each connection. Last the reactor is run. It now listens to all incoming connections.

let reactor = Reactor::default().unwrap();
let address = "0.0.0.0:7000".parse().unwrap();
listen(&reactor.handle(), address, Echo::from_stream).unwrap();
reactor.run().unwrap();

Echo is a simple struct. It contains a TCP stream and a buffer of 128 bytes to store data send by the client.

struct Echo {
    buf: Vec<u8>,
    stream: TcpStream,
}

A tokio TcpStream behaves a little different than a blocking TcpStream from the standard library. It returns data as long as there are ready bytes. If there are no more bytes left to read it throws an io::Error from the kind WouldBlock. If the socket was closed and there are no more bytes to read it returns that it successfully read zero bytes. The non-blocking nature of the stream renders some methods of std::io::Read useless. For example one cannot use read_to_end since the stream may block at any time before being closed. On the other hand a read_to_block function would be useful that reads until the next block. In the echo server one could omit the loop and just read all contents from the stream and write them to the output.

As Echo is a tokio task it shall implement the Task trait. The method tick is called by the reactor whenever the TCP stream contains data. Here the method reads all data from the stream and writes it back to the stream immediatly.

If there are no more bytes to read the task is terminated by returning Tick::Final, it won’t be called again by tokio. If some bytes were read all of them are written to the TCP stream. If there is a WouldBlock error the reactor is informed that the task has currently no more work to do but should be called again if the server receives more bytes on the connection.

impl Task for Echo {
    fn tick(&mut self) -> io::Result<Tick> {
        loop {
            match self.stream.read(&mut self.buf) {
                Ok(0) => return Ok(Tick::Final),
                Ok(len) => try!(self.stream.write_all(&self.buf[..len])),
                Err(ref e) if e.kind() == ErrorKind::WouldBlock =>
                    return Ok(Tick::WouldBlock),
                Err(e) => return Err(e),
            }
        }
    }
}

This is the complete code minus some imports and the trivial from_stream constructor of Echo.

To test the echo server use $ telnet localhost 7000 and enter some text. It will be promptly returned by the server.

Nitpicking about Tokio

The echo server scratches just the surface of Tokio, but I really like how Tokio abstracts over Mio. It is a lot simpler than rotor another abstraction crate for mio.

Creating a server is straightforward and one needs to only implement a single trait method to create a simple task. The documentation is good considering this is a new project but the API still needs more polish. In no particular order:

The next protocol I am going to implement will be more complex (really!) than echo but it will be a fun task to do it with tokio.

By the way this is my first blog post, please write suggestions to improve this text on Reddit.

Update

As Carl Lerche commented on Reddit the Echo server should not use write_all() because tokio may raise a WouldBlock error while writing the message. In this case you can't recover from the error because you don't know how many bytes were written before tokio blocked.

For this reason the server needs to keep additional state between calls to the tick function. It needs to remember if there are pending bytes, and if this is the case where in the buffer they are located. A pending: Range<usize>, needs to be added to the Echo struct. If the start and the end of the range are at the same position there are no pending bytes left. This is also the initial state: pending: 0..0. If there are pending bytes waiting to be written the range gives the position of the remaining bytes in the buffer. Although the buffer is always filled starting at the first byte partial writes may occur and only the first x bytes are written and the pending bytes are in the middle of the buffer.

The new body of the tick() function looks like this:

loop {
    match self.pending {
        Range { start, end } if start == end => {
            if let Some(len) = try!(self.stream
                .try_read(&mut self.buf)) {
                if len == 0 {
                    return Ok(Tick::Final);
                }
                self.pending = 0..len;
            } else {
                return Ok(Tick::WouldBlock);
            }
        }
        ref mut range => {
            if let Some(len) = try!(self.stream
                .try_write(&self.buf[range.clone()])) {
                range.start += len;
            } else {
                return Ok(Tick::WouldBlock);
            }
        }
    }
}

The match statement is called in a loop to do as much work as possible. First if there are no pending bytes some text is read from the TCP stream. The range of the bytes read is added to the pending value. If there are pending bytes the server tries to write them to the TCP stream. The range of the remaining pending bytes is updated. If the stream would block the function returns. tick will be called again when the stream is unblocked again. If there are zero bytes read the connection was closed and Tick::Final tells tokio that it should remove the task.

Obviously the code is now a bit longer than 35 lines, but at least it is correct. ☺

The complete code

This is the updated version of the code without write_all().

extern crate tokio;

use std::io;
use std::ops::Range;

use tokio::io::{TryRead, TryWrite};
use tokio::server::listen;
use tokio::tcp::TcpStream;
use tokio::reactor::{Reactor, Task, Tick};

struct Echo {
    buf: Vec<u8>,
    pending: Range<usize>,
    stream: TcpStream,
}

impl Echo {
    fn from_stream(stream: TcpStream) -> io::Result<Echo> {
        Ok(Echo {
            buf: vec![0; 128],
            pending: 0..0,
            stream: stream,
        })
    }
}

impl Task for Echo {
    fn tick(&mut self) -> io::Result<Tick> {
        loop {
            match self.pending {
                Range { start, end } if start == end => {
                    if let Some(len) = try!(self.stream
                        .try_read(&mut self.buf)) {
                        if len == 0 {
                            return Ok(Tick::Final);
                        }
                        self.pending = 0..len;
                    } else {
                        return Ok(Tick::WouldBlock);
                    }
                }
                ref mut range => {
                    if let Some(len) = try!(self.stream
                        .try_write(&self.buf[range.clone()])) {
                        range.start += len;
                    } else {
                        return Ok(Tick::WouldBlock);
                    }
                }
            }
        }
    }
}

fn main() {
    let reactor = Reactor::default().unwrap();
    let address = "0.0.0.0:7000".parse().unwrap();
    listen(&reactor.handle(), address, Echo::from_stream).unwrap();
    reactor.run().unwrap();
}