06. Event Loop
6.1 Introduction
Rewrite the echo server from chapter 04 into an event loop.
while running:
want_read = [...] # socket fds
want_write = [...] # socket fds
can_read, can_write = wait_for_readiness(want_read, want_write) # blocks!
for fd in can_read:
data = read_nb(fd) # non-blocking, only consume from the buffer
handle_data(fd, data) # application logic without IO
for fd in can_write:
data = pending_data(fd) # produced by the application
n = write_nb(fd, data) # non-blocking, only append to the buffer
data_written(fd, n) # n <= len(data), limited by the available spaceApplication code vs. event loop code
Some libraries can abstract away the event loop: the event loop code interacts with the application code via callbacks, and the application code interacts with the event loop via a well-defined API. We are not writing a library, but there is still an implicit boundary between the event loop code and the application code.
6.2 Per-connection state
With an even loop, an application task can span multiple loop iterations, so the state must be explicitly stored somewhere. Here is our per-connection state:
struct Conn {
int fd = -1;
// application's intention, for the event loop
bool want_read = false;
bool want_write = false;
bool want_close = false;
// buffered input and output
std::vector<uint8_t> incoming; // data to be parsed by the application
std::vector<uint8_t> outgoing; // responses generated by the application
};Conn::want_read&Conn::want_writerepresents the fd list for the readiness API.Conn::want_closetells the event loop to destroy the connection.Conn::incomingbuffers data from the socket for the protocol parser to work on.Conn::outgoingbuffers generated responses that are written to the socket.
The need for input buffers
Since reads are now non-blocking, we cannot just wait for n bytes
while parsing the protocol; the read_full() function is now
irrelevant. We’ll do this instead:
At each loop iteration, if the socket is ready to read:
- Do a non-blocking read.
- Add new data to the
Conn::incomingbuffer. - Try to parse the accumulated buffer.
- If there is not enough data, do nothing in that iteration.
- Process the parsed message.
- Remove the message from
Conn::incoming.
Why buffer output data?
Since writes are now non-blocking, we cannot write to sockets at
will; data is written iff the socket is ready to write. A large response
may take multiple loop iterations to complete. So the response data must
be stored in a buffer (Conn::outgoing).
6.3 The event loop code
Map from fd to connection state
poll() returns a fd list. We need to map each fd to the
Conn object.
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;On Unix, an fd is allocated as the smallest available non-negative
integer, so the mapping from fd to Conn can be a flat array
indexed by fd, and the array will be densely packed. Nothing can be more
efficient. Sometimes simple arrays can replace complex data structures
like hashtables.
The `poll()` syscall
The readiness API takes a list of fds that the program wants to do IO on, then returns a list of fds ready for IO. There are 2 types of readiness: read and write.
can_read, can_write = wait_for_readiness(want_read, want_write)We’ll use poll(), it uses the same fd list for both
input and output.
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd;
short events; // request: want to read, write, or both?
short revents; // returned: can read? can write?
};- The
nfdsargument is the size of thefdsarray. - The
timeoutargument is set to -1, which means no timeout. pollfd::eventsis a combination ofPOLLIN,POLLOUT,POLLERR:POLLIN&POLLOUTcorrespond to thewant_read&want_writefd list.POLLERRindicates a socket error that we always want to be notified about.
pollfd::reventsis returned bypoll(). It uses the same set of flags to indicate whether the fd is in thecan_readorcan_writelist.
Step 1: Construct the fd list for `poll()`
The application code decides the type of readiness notifications. It
communicates with the event loop via the want_read &
want_write flags in Conn, the fds
argument is then constructed from these flags:
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
poll_args.clear();
// put the listening sockets in the first position
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// the rest are connection sockets
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {conn->fd, POLLERR, 0};
// poll() flags from the application's intent
if (conn->want_read) {
pfd.events |= POLLIN;
}
if (conn->want_write) {
pfd.events |= POLLOUT;
}
poll_args.push_back(pfd);
}
// more ...
}Step 2: Call `poll()`
// the event loop
while (true) {
// prepare the arguments of the poll()
// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
if (rv < 0 && errno == EINTR) {
continue; // not an error
}
if (rv < 0) {
die("poll");
}
// ...
}poll() is the only blocking syscall in the entire
program. Normally it returns when at least one of the fds is ready.
However, it may occasionally return with errno = EINTR even
if nothing is ready.
If a process receives a Unix signal during a blocking syscall, the
syscall is immediately returned with EINTR to give the
process a chance to handle the signal. EINTR is not
expected for non-blocking syscalls.
EINTR is not an error, the syscall should be retried.
Even if you do not use signals, you should still handle
EINTR, because there may be unexpected sources of
signals.
Step 3: Accept new connections
We put the listening socket at position 0 on the fd list.
// the event loop
while (true) {
// prepare the arguments of the poll()
poll_args.clear();
// put the listening sockets in the first position
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// handle the listening socket
if (poll_args[0].revents) {
if (Conn *conn = handle_accept(fd)) {
// put it into the map
if (fd2conn.size() <= (size_t)conn->fd) {
fd2conn.resize(conn->fd + 1);
}
fd2conn[conn->fd] = conn;
}
}
// ...
} // the event loopaccept() is treated as read() in readiness
notifications, so it uses POLLIN. After poll()
returns, check the 1st fd to see if we can accept().
handle_accept() creates the Conn object for
the new connection. We’ll code this later.
Step 4: Invoke application callbacks
The rest of the fd list is for connection sockets. Call the application code if they are ready for IO.
while (true) {
// ...
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// handle connection sockets
for (size_t i = 1; i < poll_args.size(); ++i) { // note: skip the 1st
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
if (ready & POLLIN) {
handle_read(conn); // application logic
}
if (ready & POLLOUT) {
handle_write(conn); // application logic
}
}
}Step 5: Terminate connections
We always poll() for POLLERR on connection
sockets, so we can destroy the connection on error. Application code can
also set Conn::want_close to request the event loop to
destroy the connection.
// handle connection sockets
for (size_t i = 1; i < poll_args.size(); ++i) {
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
// read & write ...
// close the socket from socket error or application logic
if ((ready & POLLERR) || conn->want_close) {
(void)close(conn->fd);
fd2conn[conn->fd] = NULL;
delete conn;
}
}You can add a callback handle_err() to let the
application code handle the error, but there is nothing to do in our
application, so we just close the socket here.
6.4 Application code with non-blocking IO
Non-blocking `accept()`
Before entering the event loop, make the listening socket
non-blocking with fcntl.
static void fd_set_nb(int fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}Then the event loop calls back the application code to do
accept().
static Conn *handle_accept(int fd) {
// accept
struct sockaddr_in client_addr = {};
socklen_t addrlen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
if (connfd < 0) {
return NULL;
}
// set the new connection fd to nonblocking mode
fd_set_nb(connfd);
// create a `struct Conn`
Conn *conn = new Conn();
conn->fd = connfd;
conn->want_read = true; // read the 1st request
return conn;
}The connection socket is also made non-blocking, waiting for its 1st read.
Protocol parser with non-blocking read
See the comments for each sub-step.
static void handle_read(Conn *conn) {
// 1. Do a non-blocking read.
uint8_t buf[64 * 1024];
ssize_t rv = read(conn->fd, buf, sizeof(buf));
if (rv <= 0) { // handle IO error (rv < 0) or EOF (rv == 0)
conn->want_close = true;
return;
}
// 2. Add new data to the `Conn::incoming` buffer.
buf_append(conn->incoming, buf, (size_t)rv);
// 3. Try to parse the accumulated buffer.
// 4. Process the parsed message.
// 5. Remove the message from `Conn::incoming`.
try_one_request(conn)
// ...
}The handling is split into try_one_request(). If there
is not enough data, it will do nothing until a future loop iteration
with more data.
// process 1 request if there is enough data
static bool try_one_request(Conn *conn) {
// 3. Try to parse the accumulated buffer.
// Protocol: message header
if (conn->incoming.size() < 4) {
return false; // want read
}
uint32_t len = 0;
memcpy(&len, conn->incoming.data(), 4);
if (len > k_max_msg) { // protocol error
conn->want_close = true;
return false; // want close
}
// Protocol: message body
if (4 + len > conn->incoming.size()) {
return false; // want read
}
const uint8_t *request = &conn->incoming[4];
// 4. Process the parsed message.
// ...
// generate the response (echo)
buf_append(conn->outgoing, (const uint8_t *)&len, 4);
buf_append(conn->outgoing, request, len);
// 5. Remove the message from `Conn::incoming`.
buf_consume(conn->incoming, 4 + len);
return true; // success
}We use std::vector as the buffer type, which is just a
dynamic array.
// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
buf.erase(buf.begin(), buf.begin() + n);
}Non-blocking write
There is no application logic, just write some data and remove it
from the buffer. write() can return less bytes and that’s
OK because the event loop will call it again.
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
if (rv < 0) {
conn->want_close = true; // error handling
return;
}
// remove written data from `outgoing`
buf_consume(conn->outgoing, (size_t)rv);
// ...
}State transitions between request and response
In a request-response protocol, the program is either reading a
request or writing a response. At the end of handle_read()
and handle_write(), we need to switch between the 2
states.
static void handle_read(Conn *conn) {
// ...
// update the readiness intention
if (conn->outgoing.size() > 0) { // has a response
conn->want_read = false;
conn->want_write = true;
} // else: want read
}static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { // all data written
conn->want_read = true;
conn->want_write = false;
} // else: want write
}This is not universally true. For example, some proxies and messaging protocols are not request-response and can read and write simultaneously.
That’s all
The protocol is the same as in chapter 04. So you can reuse the test client. The server is the bare minimum that resembles something production grade, but it’s still a toy, go to the next chapter for more advanced stuff.
Source code: