A Tokio Echo Server in 35 Lines
written by Pyfisch on
I’ve decided to give tokio the new “network application framework” based on mio a try and write an echo server.
RFC 862 is just one page and the section describing the TCP echo server is just four lines long, so I paste it here in its entirety:
One echo service is defined as a connection based application on TCP. A server listens for TCP connections on TCP port 7. Once a connection is established any data received is sent back. This continues until the calling user terminates the connection.
The main
function first creates a Reactor
. The reactor manages the
connections with the clients. The server should listen to all connections
on port 7000 (port 7 requires root, so I’ve changed it). Echo::from_stream
constructs a new task for a TCP stream. It is called by the reactor for each
connection. Last the reactor is run. It now listens to all incoming connections.
let reactor = Reactor::default().unwrap();
let address = "0.0.0.0:7000".parse().unwrap();
listen(&reactor.handle(), address, Echo::from_stream).unwrap();
reactor.run().unwrap();
Echo
is a simple struct. It contains a TCP stream and a buffer of 128 bytes to
store data send by the client.
struct Echo {
buf: Vec<u8>,
stream: TcpStream,
}
A tokio TcpStream
behaves a little different than a blocking TcpStream
from
the standard library. It returns data as long as there are ready bytes. If there
are no more bytes left to read it throws an io::Error
from the kind
WouldBlock
. If the socket was closed and there are no more bytes to read it
returns that it successfully read zero bytes. The non-blocking nature of the
stream renders some methods of std::io::Read
useless. For example one cannot
use read_to_end
since the stream may block at any time before being closed. On
the other hand a read_to_block
function would be useful that reads until the
next block. In the echo server one could omit the loop and just read all
contents from the stream and write them to the output.
As Echo
is a tokio task it shall implement the Task
trait. The method tick
is called by the reactor whenever the TCP stream contains data. Here the method
reads all data from the stream and writes it back to the stream immediatly.
If there are no more bytes to read the task is terminated by returning
Tick::Final
, it won’t be called again by tokio. If some bytes were read all
of them are written to the TCP stream. If there is a WouldBlock
error the
reactor is informed that the task has currently no more work to do but should
be called again if the server receives more bytes on the connection.
impl Task for Echo {
fn tick(&mut self) -> io::Result<Tick> {
loop {
match self.stream.read(&mut self.buf) {
Ok(0) => return Ok(Tick::Final),
Ok(len) => try!(self.stream.write_all(&self.buf[..len])),
Err(ref e) if e.kind() == ErrorKind::WouldBlock =>
return Ok(Tick::WouldBlock),
Err(e) => return Err(e),
}
}
}
}
This is the complete code minus some imports and the trivial from_stream
constructor of Echo.
To test the echo server use $ telnet localhost 7000
and enter some text. It will
be promptly returned by the server.
Nitpicking about Tokio
The echo server scratches just the surface of Tokio, but I really like how Tokio abstracts over Mio. It is a lot simpler than rotor another abstraction crate for mio.
Creating a server is straightforward and one needs to only implement a single trait method to create a simple task. The documentation is good considering this is a new project but the API still needs more polish. In no particular order:
- It would be nice if
listen
,TcpStream
and others would followstd
and accept all types implementingToSocketAddrs
so one can writeTcpStream::connect("127.0.0.1:7")
and does not need to parse the addresses. tokio::io::TcpStream
has bothread
andwrite
methods itself and implementsstd::io::Read
andWrite
. The two methods do the same.- I am not sure if I was supposed to use
try_read
instead ofread
. It should work make working with non-blocking resources easier but it only splits out theWouldBlock
error case. It’s return type isResult<Option<usize>>
. One checks if there is an error, checks forNone
, it indicates if the resource would block and then reads the length. I found it more concise to writeErr(ref e) if e.kind() == ErrorKind::WouldBlock
inside a match statement to test if a resource would block, it also reduces rightward shift.
The next protocol I am going to implement will be more complex (really!) than echo but it will be a fun task to do it with tokio.
By the way this is my first blog post, please write suggestions to improve this text on Reddit.
Update
As Carl Lerche commented on Reddit the Echo server should not use
write_all()
because tokio may raise a WouldBlock
error while writing the message.
In this case you can't recover from the error because you don't know how many bytes
were written before tokio blocked.
For this reason the server needs to keep additional state between calls to the tick function.
It needs to remember if there are pending bytes, and if this is the case where in the
buffer they are located. A pending: Range<usize>,
needs to be added to the Echo
struct.
If the start
and the end
of the range are at the same position there are no pending bytes
left. This is also the initial state: pending: 0..0
. If there are pending bytes waiting to be written
the range gives the position of the remaining bytes in the buffer. Although the buffer is always
filled starting at the first byte partial writes may occur and only the first x bytes are written and
the pending bytes are in the middle of the buffer.
The new body of the tick()
function looks like this:
loop {
match self.pending {
Range { start, end } if start == end => {
if let Some(len) = try!(self.stream
.try_read(&mut self.buf)) {
if len == 0 {
return Ok(Tick::Final);
}
self.pending = 0..len;
} else {
return Ok(Tick::WouldBlock);
}
}
ref mut range => {
if let Some(len) = try!(self.stream
.try_write(&self.buf[range.clone()])) {
range.start += len;
} else {
return Ok(Tick::WouldBlock);
}
}
}
}
The match statement is called in a loop to do as much work as possible. First if there are no
pending bytes some text is read from the TCP stream. The range of the bytes read is added to
the pending value. If there are pending bytes the server tries to write them to the TCP stream.
The range of the remaining pending bytes is updated. If the stream would block the function
returns. tick
will be called again when the stream is unblocked again. If there are zero bytes
read the connection was closed and Tick::Final
tells tokio that it should remove the task.
Obviously the code is now a bit longer than 35 lines, but at least it is correct. ☺
The complete code
This is the updated version of the code without write_all()
.
extern crate tokio;
use std::io;
use std::ops::Range;
use tokio::io::{TryRead, TryWrite};
use tokio::server::listen;
use tokio::tcp::TcpStream;
use tokio::reactor::{Reactor, Task, Tick};
struct Echo {
buf: Vec<u8>,
pending: Range<usize>,
stream: TcpStream,
}
impl Echo {
fn from_stream(stream: TcpStream) -> io::Result<Echo> {
Ok(Echo {
buf: vec![0; 128],
pending: 0..0,
stream: stream,
})
}
}
impl Task for Echo {
fn tick(&mut self) -> io::Result<Tick> {
loop {
match self.pending {
Range { start, end } if start == end => {
if let Some(len) = try!(self.stream
.try_read(&mut self.buf)) {
if len == 0 {
return Ok(Tick::Final);
}
self.pending = 0..len;
} else {
return Ok(Tick::WouldBlock);
}
}
ref mut range => {
if let Some(len) = try!(self.stream
.try_write(&self.buf[range.clone()])) {
range.start += len;
} else {
return Ok(Tick::WouldBlock);
}
}
}
}
}
}
fn main() {
let reactor = Reactor::default().unwrap();
let address = "0.0.0.0:7000".parse().unwrap();
listen(&reactor.handle(), address, Echo::from_stream).unwrap();
reactor.run().unwrap();
}