 
The code below is for an example NetIO module, mod_netiolog.c.
It demonstrates how to use the NetIO API to register custom callbacks,
and illustrates where, in the running of the server, those callbacks are
invoked.
#include "conf.h"
#define MOD_NETIOLOG_VERSION    "mod_netiolog/1.0"
static pr_netio_t *netiolog_netio = NULL;
/* NetIO Callbacks
 */
static void netiolog_abort_cb(pr_netio_stream_t *nstrm) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_abort_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  nstrm->strm_flags |= PR_NETIO_SESS_ABORT;
}
static int netiolog_close_cb(pr_netio_stream_t *nstrm) {
  int res;
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_close_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" : 
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  res = close(nstrm->strm_fd);
  nstrm->strm_fd = -1;
  return res;
}
/* Note: This callback may not actually ever be called, depending on when
 * the netio is registered.  In this module, for example, the netio
 * registration occurs in the session_init function.  At this point the
 * control connection has already been opened/established (when the client
 * first connects to the server), and so this function is not invoked.
 * However, the other netio callbacks are invoked.  This has consequences
 * for any netio- or stream-specific initialization that needs to occur,
 * at least for control channels: other control stream callbacks may need
 * to check that any necessary initialization has occurred, rather than
 * assuming that such initialization has been taken care of by this open
 * callback.
 *
 * In order to have this callback used, the netio must be registered at
 * mod_init time, which means that it will be used for all connections
 * established; this may or may not be desired, depending.
 */
static pr_netio_stream_t *netiolog_open_cb(pr_netio_stream_t *nstrm, int fd,
    int mode) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_open_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  nstrm->strm_fd = fd;
  nstrm->strm_mode = mode;
  return nstrm;
}
static int netiolog_poll_cb(pr_netio_stream_t *nstrm) {
  fd_set rfds, wfds;
  struct timeval tval;
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_poll_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  FD_ZERO(&rfds);
  FD_ZERO(&wfds);
  if (nstrm->strm_mode == PR_NETIO_IO_RD)
    FD_SET(nstrm->strm_fd, &rfds);
  else
    FD_SET(nstrm->strm_fd, &wfds);
  tval.tv_sec = ((nstrm->strm_flags & PR_NETIO_SESS_INTR) ?
    nstrm->strm_interval: 60);
  tval.tv_usec = 0;
  return select(nstrm->strm_fd + 1, &rfds, &wfds, NULL, &tval);
}
static int netiolog_postopen_cb(pr_netio_stream_t *nstrm) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_postopen_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  return 0;
}
static int netiolog_read_cb(pr_netio_stream_t *nstrm, char *buf, size_t buflen) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_read_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  return read(nstrm->strm_fd, buf, buflen);
}
static pr_netio_stream_t *netiolog_reopen_cb(pr_netio_stream_t *nstrm, int fd,
    int mode) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_reopen_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  if (nstrm->strm_fd != -1)
    close(nstrm->strm_fd);
  nstrm->strm_fd = fd;
  nstrm->strm_mode = mode;
  return nstrm;
}
static int netiolog_write_cb(pr_netio_stream_t *nstrm, char *buf,
    size_t buflen) {
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": %s %s netiolog_write_cb(): called",
    nstrm->strm_type == PR_NETIO_STRM_CTRL ? "[ctrl]" :
    nstrm->strm_type == PR_NETIO_STRM_DATA ? "[data]" : "[othr]",
    nstrm->strm_mode == PR_NETIO_IO_RD ? "[read]" :
    nstrm->strm_mode == PR_NETIO_IO_WR ? "[write]" : "[none]");
  return write(nstrm->strm_fd, buf, buflen);
}
/* Helper routines
 */
static void netiolog_install(void) {
  pr_netio_t *netio = netiolog_netio ? netiolog_netio :
    (netiolog_netio = pr_alloc_netio(session.pool ? session.pool :
    permanent_pool));
  int strm_types = PR_NETIO_STRM_CTRL|PR_NETIO_STRM_DATA|PR_NETIO_STRM_OTHR;
  /* Install this module's NetIO callbacks. */
  netio->abort = netiolog_abort_cb;
  netio->close = netiolog_close_cb;
  netio->open = netiolog_open_cb;
  netio->poll = netiolog_poll_cb;
  netio->postopen = netiolog_postopen_cb;
  netio->read = netiolog_read_cb;
  netio->reopen = netiolog_reopen_cb;
  netio->write = netiolog_write_cb;
  pr_unregister_netio(strm_types);
  log_debug(DEBUG0, MOD_NETIOLOG_VERSION ": registering netiolog netio");
  if (pr_register_netio(netio, strm_types) < 0)
    log_pri(LOG_INFO, MOD_NETIOLOG_VERSION ": error registering netio: %s",
      strerror(errno));
}
/* Configuration handlers
 */
/* usage: NetIOLogEngine "on"|"off"|"daemon"|"sessions" */
MODRET set_netiologengine(cmd_rec *cmd) {
  int bool = -1;
  config_rec *c = NULL;
  CHECK_ARGS(cmd, 1);
  CHECK_CONF(cmd, CONF_ROOT|CONF_VIRTUAL|CONF_GLOBAL);
  if ((bool = get_boolean(cmd, 1)) == -1) {
    if (!strcmp(cmd->argv[1], "daemon")) {
      log_debug(DEBUG0, MOD_NETIOLOG_VERSION
        ": %s: applying netio to daemon", cmd->argv[0]);
      netiolog_install();
    } else if (!strcmp(cmd->argv[1], "sessions")) {
      log_debug(DEBUG0, MOD_NETIOLOG_VERSION
        ": %s: applying netio to sessions", cmd->argv[0]);
      bool = TRUE;
    }
  } else {
    if (bool == TRUE) {
      log_debug(DEBUG0, MOD_NETIOLOG_VERSION
        ": %s: applying netio to daemon and sessions", cmd->argv[0]);
      netiolog_install();
    }
  }
  c = add_config_param(cmd->argv[0], 1, NULL);
  c->argv[0] = pcalloc(c->pool, sizeof(unsigned char));
  *((unsigned char *) c->argv[0]) = bool;
  return HANDLED(cmd);
}
/* Initialization functions
 */
static void netiolog_exit(void) {
  /* Be thorough, and clean up after ourselves. */
  destroy_pool(netiolog_netio->pool);
}
static int netiolog_sess_init(void) {
  unsigned char *sessions = get_param_ptr(main_server->conf,
    "NetIOLogEngine", FALSE);
  if (sessions && *sessions == TRUE) {
    log_debug(DEBUG0, MOD_NETIOLOG_VERSION
      ": session init: registering netiolog netio");
    netiolog_install();
  }
  /* Register our exit handler. */
  add_exit_handler(netiolog_exit);
  return 0;
}
/* Module API tables
 */
static conftable netiolog_conftab[] = {
  { "NetIOLogEngine",           set_netiologengine,             NULL },
  { NULL }
};
module netiolog_module = {
  NULL, NULL,
  /* Module API version 2.0 */
  0x20,
  /* Module name */
  "netiolog",
  /* Module configuration handler table */
  netiolog_conftab,
  /* Module command handler table */
  NULL,
  /* Module authentication handler table */
  NULL,
  /* Module initialization function */
  NULL,
  /* Session initialization function */
  netiolog_sess_init
};