Fast, scalable and simple ... reliable Network IO

This page is about how to do Fast Network IO, how it needs to be done to be reliable, how you can scale it to many connections and how all that can be even simpler than doing it the slower and less reliable, but more common, ways. It primarily talks about the POSIX APIs, although the lessons apply equally to win32 (yes, even if you are using IOCP which is mostly an IO event, not an IO, mechanism -- much like epoll or SIGIO on Linux).

This page is not an introduction to network programing, which is covered pretty well in Beej's network programming guide. It also doesn't talk extensivly about different methods of getting events about when to do the Network IO, which is covered in Dan Kegel's The C10k problem site ... although it does assume some method that allows you to wait for events from multiple sources.

Index

Framework for the IO examples

I'm going to show a few different examples of IO methods, and I want to just show the different methods of how Network IO can be done without having a lot of code in each example to do with where the IO comes from or dealing with the IO events. So this is a relativly simple framework I'll use in each example, it only has a single input and a single output ... but that is fine for the examples.

Each example will get full "lines" of data from the input, and then output them. Obviously this can be much more efficient, in many examples, by never even looking for the "lines" and just moving data straight from the input to the output. However real Network IO is very likely to need to be segmented in some way, and more than a few protocols use a defined termination method which makes it somewhat easier to generate and parse but somewhat harder to limit.

The framework calls a function in the example to read data, and a seperate function to write data. Again although nothing is done inbetween, this isn't the case in real programs. Each example has a single data struct that holds data for the connection and one of four states for both input and output, the states are: ready, blocking, none or dead.

In the first two states the input or output function will be called (possibly blocking when in the blocking states), in the nothing state it will be ignored and in the dead state it will assume to no longer have any use (so when the output state is dead the program will end ... as no more data can be output).

To give some idea of how they perform, you can run this little perl script and then get a decent "unix cat" like program to pipe the output to. This isn't perfect, but can give some idea of how they behave at high load...

perl -le 'for (1..$ARGV[0]) { print $_; for (a..z) { print $_ x $ARGV[1] } }' X Y
/* Framework for testing IO designs...
 * define the following...
 *
 * IO_CALL_DATA_DECL = Type of data to pass between functions
 * IO_CALL_DATA_INIT = Init for data to pass between functions
 * IO_CALL_GET = function to get data from an fd
 *               void (*)(int, IO_CALL_DATA_DECL);
 * IO_CALL_PUT = function to put data to an fd
 *               void (*)(int, IO_CALL_DATA_DECL);
 */

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <err.h>

#include <sys/poll.h>

#include <sys/time.h> /* gettimeofday */

#include <malloc.h> /* mallinfo */

#define TRUE  1
#define FALSE 0

#define assert(x) do { if (x) {} else errx(EXIT_FAILURE, "assert(" #x "), FAILED at line %u", __LINE__); } while (FALSE)
#define ASSERT(x) do { if (x) {} else errx(EXIT_FAILURE, "ASSERT(" #x "), FAILED at line %u", __LINE__); } while (FALSE)

/* fwd declaration... */
static IO_CALL_DATA_DECL IO_CALL_INI(int rfd, int wfd);
static void IO_CALL_GET(IO_CALL_DATA_DECL);
static void IO_CALL_PUT(IO_CALL_DATA_DECL);

#define STUPID_MEM_ST(T) stupid_mem_alloc(sizeof(struct T))
#define STUPID_MEM_T(T)  stupid_mem_alloc(sizeof(T))
static void *stupid_mem_alloc(size_t sz)
{
  void *ret = NULL;
  
  if (!(ret = malloc(sz)))
    errno = ENOMEM, err(EXIT_FAILURE, "malloc");

  return (ret);
}

static void io_fd_set_o_nonblock(int fd)
{
  int flags = 0;

  if ((flags = fcntl(fd, F_GETFL)) == -1)
      err(EXIT_FAILURE, "fcntl(F_GETFL)");

  if (!(flags & O_NONBLOCK))
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
      err(EXIT_FAILURE, "fcntl(F_SETFL, O_NONBLOCK)");
}

#define IO_OK    0
#define IO_BLOCK 1
#define IO_EOF   2
#define IO_NONE  3

#ifdef IO_STATE_DBG
static const char *io_map_states[] = { "OK", "BLOCK", "EOF", "NONE" };
#endif

/* block waiting for IO read, write or both... */
static void io_block(int r_fd, int w_fd)
{
  struct pollfd ios_beg[2];
  struct pollfd *ios = ios_beg;
  unsigned int num = 0;
   
  ios[0].revents = ios[1].revents = 0;
   
  if (r_fd == w_fd)
  { /* block on both read and write, same fds */
    num = 1;
    ios[0].events = POLLIN | POLLOUT;
    ios[0].fd     = w_fd;
  }
  else
  { /* block on read or write or both */
    if (r_fd != -1)
    {
      ios->events = POLLIN;
      ios->fd     = r_fd;
      ++num; ++ios;
    }
    if (w_fd != -1)
    {
      ios->events = POLLOUT;
      ios->fd     = w_fd;
      ++num; ++ios;
    }
  }

  if (poll(ios_beg, num, 0) > 0)
    return;
  
  while (poll(ios_beg, num, -1) == -1) /* can't timeout */
  {
    if (errno != EINTR)
      err(EXIT_FAILURE, "poll");
  }
}

int main(void)
{
  IO_CALL_DATA_DECL data;
  struct mallinfo mal_beg;
  struct mallinfo mal_end;
  struct timeval tv_beg;
  struct timeval tv_end;
  
  io_fd_set_o_nonblock(STDIN_FILENO);
  io_fd_set_o_nonblock(STDOUT_FILENO);

  mal_beg = mallinfo(); /* warning */
  gettimeofday(&tv_beg, NULL);

  data = IO_CALL_INI(STDOUT_FILENO, STDIN_FILENO);
  
  while ((data->w_state != IO_EOF) &&
         !((data->r_state == IO_EOF) && (data->w_state == IO_NONE)))
  {
    if ((data->r_state != IO_EOF) && (data->r_state != IO_NONE))
      IO_CALL_GET(data);
    
    assert((data->w_state != IO_NONE) || (data->r_state != IO_NONE));
    
    if ((data->w_state != IO_EOF) && (data->w_state != IO_NONE))
      IO_CALL_PUT(data);

    assert((data->w_state != IO_NONE) || (data->r_state != IO_NONE));
    
    if (data->w_state == IO_BLOCK)
    {
      if (data->r_state == IO_BLOCK)
        io_block(data->rfd, data->wfd);
      if ((data->r_state == IO_NONE) || (data->r_state == IO_EOF))
        io_block(-1, data->wfd);
    }
    else if ((data->r_state == IO_BLOCK) &&
             ((data->w_state == IO_NONE) || (data->w_state == IO_EOF)))
      io_block(data->rfd, -1);
  }

  gettimeofday(&tv_end, NULL);
  if (tv_end.tv_usec < tv_beg.tv_usec)
  {
    tv_end.tv_usec += 1000 * 1000;
    tv_end.tv_sec  -= 1;
  }
  mal_end = mallinfo(); /* warning */

  fprintf(stderr, "| %'8lu.%-6lu | %'16lu |\n",
          tv_end.tv_sec  - tv_beg.tv_sec,
          tv_end.tv_usec - tv_beg.tv_usec,
          (unsigned long) (mal_end.uordblks - mal_beg.uordblks) +
          (mal_end.hblkhd - mal_beg.hblkhd));

  
  exit (EXIT_SUCCESS);
}

Single buffered, single byte IO

This is often what people write to start with, even though this version works with "non-blocking" IO in the framework due to using the same buffer for both input and output it acts just like a blocking IO program. This is often perceived as the simplest way of writting the IO, although it doesn't have the simpest structure, or the smallest number of lines.

The one "advantage" this version does have is that if you imagine that the "buf, size, length" pair to be a string, you can read seperate lines into seperate strings (using the file descriptor itself as the temporary storage). Where normally you'd read the data from the network, and then move it to different "strings" per line. This is most useful if you ever need to read a line from an fd and then pass that fd onto another process which you don't have control over.

/* single IO,
 *  has a single buffer
 *  reads bytes 1 at a time until it has a line and the prints it
 *  is the stupidly "easy" first attempt category, but also stupidly slow */

struct Single_io
{
 unsigned int r_state;
 unsigned int w_state;

 unsigned int off;
 unsigned int len;
 unsigned int sz;
 char *buf;

 int rfd;
 int wfd;
};

#define IO_CALL_DATA_DECL struct Single_io *

#define IO_CALL_INI single_ini
#define IO_CALL_GET single_get
#define IO_CALL_PUT single_put

#include "io_frame_work.c"

struct Single_io *single_ini(int rfd, int wfd)
{
  struct Single_io single_var_init = {IO_OK, IO_NONE, 0, 0, 512, NULL, -1, -1};
  struct Single_io *ptr = STUPID_MEM_ST(Single_io);
  
  *ptr = single_var_init;
  ptr->buf = stupid_mem_alloc(ptr->sz);

  ptr->rfd = rfd;
  ptr->wfd = wfd;
  
  return (ptr);
}

void single_get(struct Single_io *ptr)
{
  size_t orlen = ptr->len;
  
  while ((ptr->len - orlen) < 2048)
  {
    ssize_t bytes = read(ptr->rfd, ptr->buf + ptr->len, 1);

    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "read");
      
      if (errno == EINTR)
        continue;
      
      ptr->r_state = IO_BLOCK;
      return;
    }

    if (bytes == 0)
    {
      ptr->r_state = IO_EOF;
      if (ptr->len)
      {
        ptr->buf[ptr->len++] = '\n';
        ptr->w_state = IO_OK;
      }
      return;
    }
    
    if (ptr->buf[ptr->len++] == '\n')
    { /* found a line */
      ptr->r_state = IO_NONE;
      ptr->w_state = IO_OK;
      ptr->off = 0;
      return;
    }

    ASSERT(ptr->len <= ptr->sz);
    if (ptr->len >= ptr->sz)
    {
      ptr->sz <<= 1;
      if (!(ptr->buf = realloc(ptr->buf, ptr->sz)))
        errno = ENOMEM, err(EXIT_FAILURE, "read");
    }
  }
}

void single_put(struct Single_io *ptr)
{
  size_t len = (ptr->len - ptr->off);
  
  while (len)
  {
    ssize_t bytes = write(ptr->wfd, ptr->buf + ptr->off, len);

    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "write");
      
      if (errno == EINTR)
        continue;
      
      ptr->w_state = IO_BLOCK;
      return;
    }

    ptr->off += bytes;
    len      -= bytes;
  }

  /* finished writing line */
  ptr->off = 0;
  ptr->len = 0;
  
  ptr->w_state = IO_NONE;
  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_OK;
}

Double buffered IO

This has two major differences over the single buffered example. The first is that we can read and write data at the same time. The second that we are reading the input data in large blocks.

Reading and writing at the same time can be a major advantage. For instance referring to Figure 1, if we imagine that data goes along "I to E" and "E to O" at a minimum of 1k a second, a maximum of 10k a second and an average of 7k a second. If example is non-blocking then the complete path from Input to Output will have an average roughly the same. However if Example is blocking, then we can only be either reading or writting so the complete path is always the minimum of either value at any given time.

Input (I to E)-> Example (E to O)-> Ouput

As an example, consider that for 10 seconds "I to E" has the values "6, 9, 10, 8, 8, 8, 3, 7, 1, 10" and "E to O" has the values "8, 9, 1, 8, 1, 7, 8, 8, 10, 10". The complete path for the non-blocking version should have the values "6, 9, 1, 8, 1, 7, 8, 8, 10, 10" (which is an average of .2k per second less and has 2k left over) ... but the blocking version has the values "6, 9, 1, 8, 1, 7, 3, 7, 1, 10" (which is an average of 1.7k per second less and has 17k still left over). This means the IO will scale much better.

The other big change of reading in large blocks means that the operating system is only called for each block of data, instead of each byte. This can significantly reduce the constant overhead of doing the IO, allowing a higher maximum (however scalability is the same).

/* double IO,
 *  has two buffers, so we can read and write at the same time.
 *  reads in chunks, reads as much data as possible at once
 *  involves _at best_ a double copy ... can be much, much worse */

struct Double_io
{
 unsigned int r_state;
 unsigned int w_state;

 unsigned int r_fwd_state;
 
 unsigned int r_len;
 unsigned int r_sz;
 char *r_buf;
 
 unsigned int w_off;
 unsigned int w_len;
 unsigned int w_sz;
 char *w_buf;
 
 int rfd;
 int wfd;
};

#define IO_CALL_DATA_DECL struct Double_io *

#define IO_CALL_INI double_ini
#define IO_CALL_GET double_get
#define IO_CALL_PUT double_put

#include "io_frame_work.c"


struct Double_io *double_ini(int rfd, int wfd)
{
  struct Double_io double_var_init = {IO_OK, IO_NONE, IO_OK,
                                      0,    512, NULL,
                                      0, 0, 512, NULL,
                                      -1, -1};
  struct Double_io *ptr = STUPID_MEM_ST(Double_io);
  
  *ptr = double_var_init;
  ptr->r_buf = stupid_mem_alloc(ptr->r_sz);
  ptr->w_buf = stupid_mem_alloc(ptr->w_sz);
  
  ptr->rfd = rfd;
  ptr->wfd = wfd;
  
  return (ptr);
}

static void mov(struct Double_io *ptr)
{ /* when write is free we can move it over */
  char *eol = NULL;
  
  if (ptr->w_state != IO_NONE)
    return;

  ASSERT(!ptr->w_len);
  
  if (!ptr->r_len ||
      !(eol = memchr(ptr->r_buf, '\n', ptr->r_len)))
      return;
  
  ptr->w_len = (eol - ptr->r_buf) + 1;
  
  if (ptr->w_len > ptr->w_sz)
  {
    ptr->w_sz = ptr->w_len;
    if (!(ptr->w_buf = realloc(ptr->w_buf, ptr->w_sz)))
      errno = ENOMEM, err(EXIT_FAILURE, "read");
  }

  ASSERT(ptr->r_len >= ptr->w_len);
  
  memcpy(ptr->w_buf, ptr->r_buf, ptr->w_len);
  ptr->r_len -= ptr->w_len;
  if (ptr->r_len)
    memmove(ptr->r_buf, ptr->r_buf + ptr->w_len, ptr->r_len);

  if (!ptr->r_len)
    ptr->r_state = ptr->r_fwd_state;

  ptr->w_state = IO_OK;
}

void double_get(struct Double_io *ptr)
{
  size_t added = 0;

  mov(ptr);
  
  while ((ptr->r_fwd_state == IO_OK) && (added < 2048))
  {
    size_t len = ptr->r_len;
    ssize_t bytes = read(ptr->rfd, ptr->r_buf + len, ptr->r_sz - len);
    
    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "read");
      
      if (errno == EINTR)
        continue;

      ptr->r_state = IO_BLOCK;
      return;
    }

    added      += bytes;
    ptr->r_len += bytes;
    
    if (!bytes)
    {
      if (!ptr->r_len)
        ptr->r_state = IO_EOF;
      else
      {
        if (ptr->r_buf[ptr->r_len - 1] != '\n')
          ptr->r_buf[ptr->r_len++] = '\n';
        ptr->r_state     = IO_NONE;
        ptr->r_fwd_state = IO_EOF;
        mov(ptr);
      }
      
      return;
    }
    
    mov(ptr);

    ASSERT(ptr->r_len <= ptr->r_sz);
    if (ptr->r_len >= ptr->r_sz)
    {
      ptr->r_sz <<= 1;
      if (!(ptr->r_buf = realloc(ptr->r_buf, ptr->r_sz)))
        errno = ENOMEM, err(EXIT_FAILURE, "read");
    }
  }
  if (ptr->r_fwd_state == IO_OK)
    return;

  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_NONE;
}

void double_put(struct Double_io *ptr)
{
  size_t len = (ptr->w_len - ptr->w_off);
  
  while (len)
  {
    ssize_t bytes = write(ptr->wfd, ptr->w_buf + ptr->w_off, len);

    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "write");
      
      if (errno == EINTR)
        continue;
      
      ptr->w_state = IO_BLOCK;
      return;
    }

    ptr->w_off += bytes;
    len      -= bytes;
  }

  /* finished writing line */
  ptr->w_off = 0;
  ptr->w_len = 0;
  
  ptr->w_state = IO_NONE;
  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_OK;
}

Double buffered IO, with offset

This is a simple change over the previous example, where instead of moving the input data to the front of the buffer everytime we moved some data from it we now walk along the input buffer and move it when needed instead of allocating more memory. More metadata is needed for the input connection, but there is a big scalability problem with the previous method when there are lots of small pieces of data. For instance say we read 4 lines, then with the previous version we'll copy the 1st line once, the second twice, the third 3 times and the 4th four times. With this version we'll only copy each once, unless we run out of memory.

/* double IO,
 *  has two buffers, so we can read and write at the same time.
 *  reads in chunks, reads as much data as possible at once
 *  involves _at best_ a double copy ... can involve other memory moves
 *     but is much less frequent than doublemove */

struct Double_io
{
 unsigned int r_state;
 unsigned int w_state;

 unsigned int r_fwd_state;
 
 unsigned int r_len;
 unsigned int r_off;
 unsigned int r_sz;
 char *r_buf;
 
 unsigned int w_off;
 unsigned int w_len;
 unsigned int w_sz;
 char *w_buf;

 int rfd;
 int wfd;
};

#define IO_CALL_DATA_DECL struct Double_io *

#define IO_CALL_INI double_ini
#define IO_CALL_GET double_get
#define IO_CALL_PUT double_put

#include "io_frame_work.c"


struct Double_io *double_ini(int rfd, int wfd)
{
  struct Double_io double_var_init = {IO_OK, IO_NONE, IO_OK,
                                      0, 0, 512, NULL,
                                      0, 0, 512, NULL,
                                      -1, -1};
  struct Double_io *ptr = STUPID_MEM_ST(Double_io);

  *ptr = double_var_init;
  ptr->r_buf = stupid_mem_alloc(ptr->r_sz);
  ptr->w_buf = stupid_mem_alloc(ptr->w_sz);
  
  ptr->rfd = rfd;
  ptr->wfd = wfd;
  
  return (ptr);
}

/*
# define IO_DBG(x) fprintf(stderr, "%u r(%u %u %u %s) w(%u %u %u %s)\n", \
  __LINE__, (x)->r_len, (x)->r_off, (x)->r_sz, io_map_states[(x)->r_state], \
            (x)->w_len, (x)->w_off, (x)->r_sz, io_map_states[(x)->w_state])
*/

static void mov(struct Double_io *ptr)
{ /* when write is free we can move it over */
  char *eol = NULL;
  
  if (ptr->w_state != IO_NONE)
    return;

  ASSERT(!ptr->w_len);
  
  if (!ptr->r_len ||
      !(eol = memchr(ptr->r_buf + ptr->r_off, '\n', ptr->r_len)))
      return;
  
  ptr->w_len = (eol - (ptr->r_buf + ptr->r_off)) + 1;
  
  if (ptr->w_len > ptr->w_sz)
  {
    ptr->w_sz = ptr->w_len;
    if (!(ptr->w_buf = realloc(ptr->w_buf, ptr->w_sz)))
      errno = ENOMEM, err(EXIT_FAILURE, "read");
  }

  ASSERT(ptr->r_len >= ptr->w_len);
  
  memcpy(ptr->w_buf, ptr->r_buf + ptr->r_off, ptr->w_len);
  ptr->r_len -= ptr->w_len;
  ptr->r_off += ptr->w_len;
  
  if (!ptr->r_len)
  {
    ptr->r_off = 0;
    ptr->r_state = ptr->r_fwd_state;
  }
  
  ptr->w_state = IO_OK;
}

void double_get(struct Double_io *ptr)
{
  size_t added = 0;
  
  mov(ptr);
  
  while ((ptr->r_fwd_state == IO_OK) && (added < 2048))
  {
    size_t len = ptr->r_off + ptr->r_len;
    ssize_t bytes = read(ptr->rfd, ptr->r_buf + len, ptr->r_sz - len);
    
    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "read");
      
      if (errno == EINTR)
        continue;

      ptr->r_state = IO_BLOCK;
      return;
    }

    added      += bytes;
    ptr->r_len += bytes;
    
    if (!bytes)
    {
      if (!ptr->r_len)
        ptr->r_state = IO_EOF;
      else
      {
        if (ptr->r_buf[ptr->r_off + ptr->r_len - 1] != '\n')
          ptr->r_buf[ptr->r_off + ptr->r_len++] = '\n';
        ptr->r_state     = IO_NONE;
        ptr->r_fwd_state = IO_EOF;
        mov(ptr);
      }
      return;
    }
    
    mov(ptr);

    len = ptr->r_off + ptr->r_len;
    ASSERT(len <= ptr->r_sz);
    if (len >= ptr->r_sz)
    {
      if (ptr->r_off)
      {
        memmove(ptr->r_buf, ptr->r_buf + ptr->r_off, ptr->r_len);
        ptr->r_off = 0;
        continue;
      }
      
      ptr->r_sz <<= 1;
      if (!(ptr->r_buf = realloc(ptr->r_buf, ptr->r_sz)))
        errno = ENOMEM, err(EXIT_FAILURE, "read");
    }
  }

  if (ptr->r_fwd_state == IO_OK)
    return;

  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_NONE;
}

void double_put(struct Double_io *ptr)
{
  size_t len = (ptr->w_len - ptr->w_off);
  
  while (len)
  {
    ssize_t bytes = write(ptr->wfd, ptr->w_buf + ptr->w_off, len);

    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "write");
      
      if (errno == EINTR)
        continue;
      
      ptr->w_state = IO_BLOCK;
      return;
    }

    ptr->w_off += bytes;
    len      -= bytes;
  }

  /* finished writing line */
  ptr->w_off = 0;
  ptr->w_len = 0;
  
  ptr->w_state = IO_NONE;
  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_OK;
}

Circular buffered IO

Much like the double buffered example we can read and write at the same time, however this example does by sharing a single buffer for both input and output. Also data is "transfered" from the input to the output via. a couple of fixed cost assingments. This is called Zero-copy, and is required for very scalable IO ... however for moderate IO it doesn't improve things significantly.

The other advantage is that for the double buffered example to process 10k of data, we needed 10k in the input buffer and 10k in the output buffer. So we had to use 20k of memory. With a single buffer for both, we only need half of that (just 10k). This can also be a significant saving when you scale to many connections at once.

You'll also notice that this version is much more complicated than any of the previous versions, which is also a major downside. Even I'm much less sure it doesn't contain a bug. And the string that comes out of the input is now a vector, so if you need to do anything to the data (as the example needs to search for a character) no standard functions will work.

/* circular IO,
 *  we can read and write at the same time.
 *  reads in chunks, reads as much data as possible at once
 *  no data needs to be copied (zero copy IO)
 */

struct Circular_io
{
 unsigned int r_state;
 unsigned int w_state;

 unsigned int r_fwd_state;
 
 struct iovec *r_v;
 unsigned int  r_num;
 unsigned int  r_len;
 
 struct iovec *w_v;
 unsigned int  w_num;
 unsigned int  w_len;

 unsigned int sz;
 char *buf;
 
 int rfd;
 int wfd;
};

#define IO_CALL_DATA_DECL struct Circular_io *

#define IO_CALL_INI circular_ini
#define IO_CALL_GET circular_get
#define IO_CALL_PUT circular_put

#include "io_frame_work.c"

#include <sys/uio.h>

#define CIRC_IOV_U1 0
#define CIRC_IOV_U2 1
#define CIRC_IOV_N  2

static size_t iov_memchr(struct iovec *iov, unsigned int num, char srch)
{
  size_t len = 0;
  
  while (num--)
  {
    const char *tmp = memchr(iov->iov_base, srch, iov->iov_len);
    
    if (tmp)
      return (len + (tmp - (char *)iov->iov_base) + 1);

    len += iov->iov_len;
    
    ++iov;
  }

  return (0);
}

struct Circular_io *circular_ini(int rfd, int wfd)
{
  struct Circular_io circular_var_init = {IO_OK, IO_NONE, IO_OK,
                                          NULL, 0, 0, NULL, 0, 0,
                                          1024, NULL, -1, -1};
  struct Circular_io *ptr = STUPID_MEM_ST(Circular_io);

  *ptr = circular_var_init;
  ptr->r_v = stupid_mem_alloc(sizeof(struct iovec) * 3);
  ptr->w_v = stupid_mem_alloc(sizeof(struct iovec) * 2);
  ptr->buf = stupid_mem_alloc(ptr->sz);
  
  ptr->rfd = rfd;
  ptr->wfd = wfd;
  
  return (ptr);
}

#ifdef IO_STATE_DBG
# define IO_DBG(x) fprintf(stderr, "%u %u r(%u %u[%u,%u] %s) w(%u %u[%u,%u] %s) n(%u)\n", \
  __LINE__, (x)->sz, (x)->r_len, (x)->r_num, (x)->r_v[0].iov_len, (x)->r_v[1].iov_len, io_map_states[(x)->r_state], \
            (x)->w_len, (x)->w_num, (x)->w_v[0].iov_len, (x)->w_v[1].iov_len, io_map_states[(x)->w_state], (x)->r_v[2].iov_len)

static int io_valid(struct Circular_io *ptr)
{
  int ret = TRUE;
  size_t w_len = 0;
  size_t r_len = 0;
  size_t n_len = 0;
  char *buf = NULL;
  size_t len = 0;
  
  switch (ptr->w_num)
  {
    case 2: w_len += ptr->w_v[CIRC_IOV_U2].iov_len;
    case 1: w_len += ptr->w_v[CIRC_IOV_U1].iov_len;
    case 0: break;
    default: ASSERT(FALSE);
  }
  
  switch (ptr->r_num)
  {
    case 2: r_len += ptr->r_v[CIRC_IOV_U2].iov_len;
    case 1: r_len += ptr->r_v[CIRC_IOV_U1].iov_len;
    case 0: break;
    default: ASSERT(FALSE);
  }

  n_len = ptr->r_v[CIRC_IOV_N].iov_len;

  if (w_len != ptr->w_len)
  {
    ret = FALSE;
    fprintf(stderr, "w_len FAILED\n");
  }
  
  if (r_len != ptr->r_len)
  {
    ret = FALSE;
    fprintf(stderr, "r_len FAILED\n");
  }
  
  if (1 && ((w_len + r_len + n_len) > ptr->sz))
  {
    ret = FALSE;
    fprintf(stderr, "sz FAILED\n");
  }

  switch (ptr->w_num)
  {
    case 2:
      buf = ptr->w_v[CIRC_IOV_U2].iov_base;
      len = ptr->w_v[CIRC_IOV_U2].iov_len;
      if ((buf < ptr->buf) || ((buf + len) > (ptr->buf + ptr->sz)))
      {
        ret = FALSE;
        fprintf(stderr, "w_v[2] bad\n");
      }
    case 1:
      buf = ptr->w_v[CIRC_IOV_U1].iov_base;
      len = ptr->w_v[CIRC_IOV_U1].iov_len;
      if ((buf < ptr->buf) || ((buf + len) > (ptr->buf + ptr->sz)))
      {
        ret = FALSE;
        fprintf(stderr, "w_v[1] bad\n");
      }
    case 0: break;
    default: ASSERT(FALSE);
  }
  
  switch (ptr->r_num)
  {
    case 2:
      buf = ptr->r_v[CIRC_IOV_U2].iov_base;
      len = ptr->r_v[CIRC_IOV_U2].iov_len;
      if ((buf < ptr->buf) || ((buf + len) > (ptr->buf + ptr->sz)))
      {
        ret = FALSE;
        fprintf(stderr, "r_v[2] bad\n");
      }
    case 1:
      buf = ptr->r_v[CIRC_IOV_U1].iov_base;
      len = ptr->r_v[CIRC_IOV_U1].iov_len;
      if ((buf < ptr->buf) || ((buf + len) > (ptr->buf + ptr->sz)))
      {
        ret = FALSE;
        fprintf(stderr, "r_v[1] bad\n");
      }
    case 0: break;
    default: ASSERT(FALSE);
  }
  
  buf = ptr->r_v[CIRC_IOV_N].iov_base;
  len = ptr->r_v[CIRC_IOV_N].iov_len;
  if ((buf < ptr->buf) || ((buf + len) > (ptr->buf + ptr->sz)))
  {
    ret = FALSE;
    fprintf(stderr, "n_v bad (%p %p %p %p)\n",
            buf, buf + len, ptr->buf, ptr->buf + ptr->sz);
  }

  return (ret);
}
#define IO_VALID(x) if (io_valid(x)) {} else IO_DBG(ptr)
#else
#define IO_VALID(x)
#endif


static void mov(struct Circular_io *ptr)
{ /* when write is free we can move it over */
  char *buf = NULL;
  size_t eol = 0;
  
  if (ptr->w_state != IO_NONE)
    return;

  ASSERT(!ptr->w_len && !ptr->w_num);
  
  if (!ptr->r_len ||
      !(eol = iov_memchr(ptr->r_v, ptr->r_num, '\n')))
    return;

  ptr->w_len  = eol;
  ptr->r_len -= eol;

  if (eol <= ptr->r_v[CIRC_IOV_U1].iov_len)
  {
    ptr->w_num = 1;
    ptr->w_v[CIRC_IOV_U1].iov_base = ptr->r_v[CIRC_IOV_U1].iov_base;
    ptr->w_v[CIRC_IOV_U1].iov_len  = eol;

    if (eol == ptr->r_v[CIRC_IOV_U1].iov_len)
    {
      if (--ptr->r_num)
      {
        ptr->r_v[CIRC_IOV_U1].iov_base = ptr->r_v[CIRC_IOV_U2].iov_base;
        ptr->r_v[CIRC_IOV_U1].iov_len  = ptr->r_v[CIRC_IOV_U2].iov_len;
      }
    }
    else
    {
      buf  = ptr->r_v[CIRC_IOV_U1].iov_base;
      buf += eol;
      ptr->r_v[CIRC_IOV_U1].iov_base = buf;
      ptr->r_v[CIRC_IOV_U1].iov_len -= eol;
    }
  }
  else
  {
    ASSERT(ptr->r_num == 2);
    
    ptr->w_num = 2;
    eol                           -= ptr->r_v[CIRC_IOV_U1].iov_len;
    ptr->w_v[CIRC_IOV_U1].iov_base = ptr->r_v[CIRC_IOV_U1].iov_base;
    ptr->w_v[CIRC_IOV_U1].iov_len  = ptr->r_v[CIRC_IOV_U1].iov_len;
    ptr->w_v[CIRC_IOV_U2].iov_base = ptr->r_v[CIRC_IOV_U2].iov_base;
    ptr->w_v[CIRC_IOV_U2].iov_len  = eol;

    if (ptr->r_v[CIRC_IOV_U2].iov_len == eol)
    {
      ASSERT(!ptr->r_len);
      ptr->r_num = 0;
    }
    else
    {
      buf  = ptr->r_v[CIRC_IOV_U2].iov_base;
      buf += eol;
      ptr->r_num = 1;
      ptr->r_v[CIRC_IOV_U1].iov_base = buf;
      ptr->r_v[CIRC_IOV_U1].iov_len  = ptr->r_v[CIRC_IOV_U2].iov_len - eol;
    }
  }

  if (!ptr->r_len)
    ptr->r_state = ptr->r_fwd_state;

  ptr->w_state = IO_OK;
}

void circular_get(struct Circular_io *ptr)
{
  size_t added = 0;
  
  if (!ptr->r_len && !ptr->w_len)
  {
    ptr->r_v[CIRC_IOV_N].iov_base = ptr->buf;
    ptr->r_v[CIRC_IOV_N].iov_len  = ptr->sz;
  }
  
  mov(ptr);
  
  while ((ptr->r_fwd_state == IO_OK) && (added < 2048))
  {
    ssize_t bytes = read(ptr->rfd,
                         ptr->r_v[CIRC_IOV_N].iov_base,
                         ptr->r_v[CIRC_IOV_N].iov_len);

    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "read");
      
      if (errno == EINTR)
        continue;

      ptr->r_state = IO_BLOCK;
      return;
    }

    added      += bytes;
    ptr->r_len += bytes;
    
    if (!bytes)
    {
      if (!ptr->r_len)
      {
        ptr->r_state = IO_EOF;
        return;
      }
      else
      {
        size_t len = ptr->r_v[ptr->r_num - 1].iov_len;
        char *buf  = ptr->r_v[ptr->r_num - 1].iov_base;
        char *nbuf = ptr->r_v[CIRC_IOV_N].iov_base;

        if (buf[len - 1] != '\n')
        {
          nbuf[0]  = '\n';
          bytes = 1;
        }
        
        ptr->r_state     = IO_NONE;
        ptr->r_fwd_state = IO_EOF;
      }
    }
    
    if (!ptr->r_num)
    {
      ptr->r_num = 1;
      ptr->r_v[CIRC_IOV_U1].iov_base = ptr->r_v[CIRC_IOV_N].iov_base;
      ptr->r_v[CIRC_IOV_U1].iov_len  = bytes;
    }
    else
    {
      char *obuf  = ptr->r_v[ptr->r_num - 1].iov_base;
      size_t olen = ptr->r_v[ptr->r_num - 1].iov_len;
      char *nbuf  = ptr->r_v[CIRC_IOV_N].iov_base;
      
      if ((obuf + olen) == nbuf)
        ptr->r_v[ptr->r_num - 1].iov_len += bytes;
      else
      {
        ASSERT(ptr->r_num == 1);
        ptr->r_num = 2;
        ptr->r_v[CIRC_IOV_U2].iov_base    = ptr->r_v[CIRC_IOV_N].iov_base;
        ptr->r_v[CIRC_IOV_U2].iov_len     = bytes;
      }
    }

    mov(ptr);    

    if (ptr->r_fwd_state == IO_OK)
    { /* make sure we have space to read more... */
      char *buf  = ptr->r_v[CIRC_IOV_N].iov_base;
      size_t len = ptr->r_v[CIRC_IOV_N].iov_len;

      ASSERT(len >= (size_t)bytes);
      if (len > (size_t)bytes)
      {
        buf += bytes;
        len -= bytes;
        ptr->r_v[CIRC_IOV_N].iov_base = buf;
        ptr->r_v[CIRC_IOV_N].iov_len  = len;
      }
      else
      {
        size_t used = (ptr->r_len + ptr->w_len);
        
        ASSERT(used <= ptr->sz);
        if (used >= ptr->sz)
        { /* have to reallocate ... so move everything to the best positions */
          void *old = ptr->buf;
          size_t xlen = 0;
          
          ptr->sz <<= 1;
          if (!(ptr->buf = malloc(ptr->sz)))
            errno = ENOMEM, err(EXIT_FAILURE, "read");

          buf  = ptr->buf;
          xlen = 0;
          switch (ptr->w_num)
          {
            case 2:
              xlen = ptr->w_v[CIRC_IOV_U2].iov_len;
              memcpy(buf + ptr->w_v[CIRC_IOV_U1].iov_len,
                     ptr->w_v[CIRC_IOV_U2].iov_base, xlen);
              ptr->w_num = 1;
            case 1:
              memcpy(buf,
                     ptr->w_v[CIRC_IOV_U1].iov_base,
                     ptr->w_v[CIRC_IOV_U1].iov_len);
              ptr->w_v[CIRC_IOV_U1].iov_base = buf;
              ptr->w_v[CIRC_IOV_U1].iov_len += xlen;
              buf += ptr->w_v[CIRC_IOV_U1].iov_len;
            case 0:
              break;
          }
          
          xlen = 0;
          switch (ptr->r_num)
          {
            case 2:
              xlen = ptr->r_v[CIRC_IOV_U2].iov_len;
              memcpy(buf + ptr->r_v[CIRC_IOV_U1].iov_len,
                     ptr->r_v[CIRC_IOV_U2].iov_base, xlen);
              ptr->r_num = 1;
            case 1:
              memcpy(buf,
                     ptr->r_v[CIRC_IOV_U1].iov_base,
                     ptr->r_v[CIRC_IOV_U1].iov_len);
              ptr->r_v[CIRC_IOV_U1].iov_base = buf;
              ptr->r_v[CIRC_IOV_U1].iov_len += xlen;
              buf += ptr->r_v[CIRC_IOV_U1].iov_len;
            case 0:
              break;
          }
          ASSERT((ptr->w_num <= 1) && (ptr->r_num <= 1));

          ASSERT(used == (ptr->r_len + ptr->w_len));
          
          free(old);
          
          ptr->r_v[CIRC_IOV_N].iov_len  = ptr->sz - used;
        }
        else
        { /* there is still space */
          ptrdiff_t len_e = 0;
          ptrdiff_t len_w = 0;
          
          buf += len;

          if (buf == (ptr->buf + ptr->sz)) /* wrap from begining to end */
            buf = ptr->buf;

          ASSERT(ptr->r_len);
          len_e = (ptr->buf + ptr->sz) - buf;
          if (ptr->w_len)
            len_w = ((char *)ptr->w_v[CIRC_IOV_U1].iov_base - buf);
          else
            len_w = ((char *)ptr->r_v[CIRC_IOV_U1].iov_base - buf);

          if ((len_w > 0) && (len_w < len_e))
            ptr->r_v[CIRC_IOV_N].iov_len  = len_w;
          else
            ptr->r_v[CIRC_IOV_N].iov_len  = len_e;
        }
        
        ptr->r_v[CIRC_IOV_N].iov_base = buf;
      }
    }
  }

  if (ptr->r_fwd_state == IO_OK)
    return;

  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_NONE;
}

void circular_put(struct Circular_io *ptr)
{
  while (ptr->w_len)
  {
    ssize_t bytes = writev(ptr->wfd, ptr->w_v, ptr->w_num);
    char *buf = NULL;
  
    if (bytes == -1)
    {
      if ((errno != EINTR) && (errno != EAGAIN))
        err(EXIT_FAILURE, "write");
      
      if (errno == EINTR)
        continue;
      
      ptr->w_state = IO_BLOCK;
      return;
    }

    if (!(ptr->w_len -= bytes))
      continue;
    
    if ((size_t)bytes >= ptr->w_v[CIRC_IOV_U1].iov_len)
    {
      bytes -= ptr->w_v[CIRC_IOV_U1].iov_len;
      
      --ptr->w_num;
      ptr->w_v[CIRC_IOV_U1].iov_base = ptr->w_v[CIRC_IOV_U2].iov_base;
      ptr->w_v[CIRC_IOV_U1].iov_len  = ptr->w_v[CIRC_IOV_U2].iov_len;
    }

    buf = ptr->w_v[CIRC_IOV_U1].iov_base;
    
    ptr->w_v[CIRC_IOV_U1].iov_base  = buf + bytes;
    ptr->w_v[CIRC_IOV_U1].iov_len  -= bytes;
  }

  ptr->w_num = 0;

  /* finished writing line */  
  ptr->w_state = IO_NONE;
  if (ptr->r_state != IO_EOF)
    ptr->r_state = IO_OK;
}

Vstr buffered IO

This version uses the Vstr string library, which represents data as a series of nodes. These nodes can be shared among strings, and are allocated for the use of a group of strings at once. This gives better performance than having a shared circular buffer, as we don't have as big a problem growing the storage ... and we don't need to grow the storage for each string, only for all strings. This can save huge amounts of memory when you have lots of connections, as at any one time some will be idle but which ones are idle will change.

Being a full string library we get all of the "normal" string functions for searching, comparison, splitting data and formatting output.

And finally, this took the smallest amount of code to implement. That is even less code than the initial "simple" single character example, including the code needed to initialize the library. And the code is much more readable if you need to see what it is doing.

/* vs IO,
   Vectored IO using the Vstr string library
  */

struct Vstr_base;

struct Vs_io
{
 unsigned int r_state;
 unsigned int w_state;

 struct Vstr_base *r_vs;
 struct Vstr_base *w_vs;

 int rfd;
 int wfd;
};

#define IO_CALL_DATA_DECL struct Vs_io *

#define IO_CALL_INI vs_ini
#define IO_CALL_GET vs_get
#define IO_CALL_PUT vs_put

#define _LARGEFILE64_SOURCE 1

#include "io_frame_work.c"

#include <stdarg.h>
#include <stdint.h>
#include <sys/uio.h>
#include <vstr.h>


struct Vs_io *vs_ini(int rfd, int wfd)
{
  struct Vs_io vs_var_init = {IO_OK, IO_NONE,
                              NULL, NULL, -1, -1};
  struct Vs_io *ptr = STUPID_MEM_ST(Vs_io);
  Vstr_base *s1 = NULL;
  Vstr_base *s2 = NULL;

  *ptr = vs_var_init;
  if (!vstr_init() ||
      !vstr_cntl_conf(NULL, VSTR_CNTL_CONF_SET_NUM_BUF_SZ, 512) ||
      !(s1 = vstr_make_base(NULL)) ||
      !(s2 = vstr_make_base(NULL)))
    errno= ENOMEM, err(EXIT_FAILURE, "init");

  /* create some data storage for _both_ strings */
  vstr_make_spare_nodes(NULL, VSTR_TYPE_NODE_BUF, 2);

  ptr->w_vs = s1;
  ptr->r_vs = s2;

  ptr->rfd = rfd;
  ptr->wfd = wfd;
  
  return (ptr);
}

void vs_get(struct Vs_io *ptr)
{
  Vstr_base *s1 = ptr->w_vs;
  Vstr_base *s2 = ptr->r_vs;
  size_t prev_s1_len = 0;
  unsigned int ern = 0;
  size_t pos = 0;

  vstr_sc_read_iov_fd(s2, s2->len, ptr->rfd, 1, 16, &ern);
    
  if (ern == VSTR_TYPE_SC_READ_FD_ERR_EOF)
    ptr->r_state = IO_EOF;
  else if ((ern == VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO) && (errno == EAGAIN))
    ptr->r_state = IO_BLOCK;
  else if (ern)
    err(EXIT_FAILURE, "read");

  /* store previous size of output... */
  prev_s1_len = s1->len;
  
  while ((pos = vstr_srch_chr_fwd(s2, 1, s2->len, '\n')))
  {
    vstr_add_vstr(s1, s1->len, s2, 1, pos, VSTR_TYPE_ADD_BUF_REF);
    vstr_del(s2, 1, pos);
  }
  
  if ((ptr->r_state == IO_EOF) && s2->len)
  { /* move last bit even if it's not a line */
    vstr_mov(s1, s1->len, s2, 1, s2->len);
    vstr_add_cstr_buf(s1, s1->len, "\n");
  }
  
  if (s1->conf->malloc_bad) /* checks _all_ above allocations */
    errno = ENOMEM, err(EXIT_FAILURE, "adding data");

  if (!prev_s1_len && s1->len)
  { /* if previously had no output, and now we do ... start the output calls */
    ASSERT(ptr->w_state == IO_NONE);
    ptr->w_state = IO_OK;
  }
}

void vs_put(struct Vs_io *ptr)
{
  Vstr_base *s1 = ptr->w_vs;
  
  if (!vstr_sc_write_fd(s1, 1, s1->len, ptr->wfd, NULL))
  {
    if (errno == EAGAIN)
    {
      ptr->w_state = IO_BLOCK;
      return;
    }
                                                                                
    err(EXIT_FAILURE, "write");
  }

  if (!s1->len)
    ptr->w_state = IO_NONE;
}

James Antill
Last modified: Mon Apr 25 13:33:35 EDT 2005