evnt.c
#include <vstr.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <getopt.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/poll.h>
#include <netdb.h>
#include <sys/time.h>
#include <time.h>
#include <signal.h>
#include <sys/stat.h>
#include <socket_poll.h>
#include <timer_q.h>
#include <sys/sendfile.h>
#define CONF_EVNT_NO_EPOLL FALSE
#define CONF_EVNT_EPOLL_SZ (10 * 1000)
#define CONF_EVNT_DUP_EPOLL TRUE
#define CONF_GETTIMEOFDAY_TIME TRUE
#ifndef CONF_FULL_STATIC
# include <netdb.h>
# define EVNT__RESOLVE_NAME(evnt, x) do { \
struct hostent *h = gethostbyname(x); \
\
if (h) \
memcpy(&EVNT_SA_IN4(evnt)->sin_addr.s_addr, \
h->h_addr_list[0], \
sizeof(EVNT_SA_IN4(evnt)->sin_addr.s_addr)); \
\
} while (FALSE)
#else
# define EVNT__RESOLVE_NAME(evnt, x) do { \
if ((EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY); \
} while (FALSE)
#endif
#if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER)
# define CONF_USE_SOCKET_FILTERS FALSE
struct sock_fprog { int dummy; };
# define SO_DETACH_FILTER 0
# define SO_ATTACH_FILTER 0
#else
# define CONF_USE_SOCKET_FILTERS TRUE
struct sock_filter
{
uint16_t code;
uint8_t jt;
uint8_t jf;
uint32_t k;
};
struct sock_fprog
{
unsigned short len;
struct sock_filter *filter;
};
#endif
#include "vlg.h"
#define EX_UTILS_NO_USE_INIT 1
#define EX_UTILS_NO_USE_EXIT 1
#define EX_UTILS_NO_USE_LIMIT 1
#define EX_UTILS_NO_USE_BLOCK 1
#define EX_UTILS_NO_USE_PUT 1
#define EX_UTILS_NO_USE_OPEN 1
#define EX_UTILS_NO_USE_IO_FD 1
#include "ex_utils.h"
#include "evnt.h"
#include "mk.h"
#define CSTREQ(x,y) (!strcmp(x,y))
#ifdef __GNUC__
# define EVNT__ATTR_UNUSED(x) vstr__UNUSED_ ## x __attribute__((unused))
#elif defined(__LCLINT__)
# define EVNT__ATTR_UNUSED(x) vstr__UNUSED_ ## x
#else
# define EVNT__ATTR_UNUSED(x) vstr__UNUSED_ ## x
#endif
#ifdef __GNUC__
# define EVNT__ATTR_USED() __attribute__((used))
#else
# define EVNT__ATTR_USED()
#endif
volatile sig_atomic_t evnt_child_exited = FALSE;
int evnt_opt_nagle = EVNT_CONF_NAGLE;
static struct Evnt *q_send_now = NULL;
static struct Evnt *q_closed = NULL;
static struct Evnt *q_none = NULL;
static struct Evnt *q_accept = NULL;
static struct Evnt *q_connect = NULL;
static struct Evnt *q_recv = NULL;
static struct Evnt *q_send_recv = NULL;
static Vlg *vlg = NULL;
static unsigned int evnt__num = 0;
static unsigned int evnt__accept_limit = 4;
static struct timeval evnt__tv[1];
static int evnt__is_child = FALSE;
#define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL)
#define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval))
void evnt_logger(Vlg *passed_vlg)
{
vlg = passed_vlg;
}
void evnt_fd__set_nonblock(int fd, int val)
{
int flags = 0;
ASSERT(val == !!val);
if ((flags = fcntl(fd, F_GETFL)) == -1)
vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
if (!!(flags & O_NONBLOCK) == val)
return;
if (val)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1)
vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__);
}
void evnt_add(struct Evnt **que, struct Evnt *node)
{
assert(node != *que);
if ((node->next = *que))
node->next->prev = node;
node->prev = NULL;
*que = node;
}
void evnt_del(struct Evnt **que, struct Evnt *node)
{
if (node->prev)
node->prev->next = node->next;
else
{
assert(*que == node);
*que = node->next;
}
if (node->next)
node->next->prev = node->prev;
}
static void evnt__del_whatever(struct Evnt *evnt)
{
if (0) { }
else if (evnt->flag_q_accept)
evnt_del(&q_accept, evnt);
else if (evnt->flag_q_connect)
evnt_del(&q_connect, evnt);
else if (evnt->flag_q_recv)
evnt_del(&q_recv, evnt);
else if (evnt->flag_q_send_recv)
evnt_del(&q_send_recv, evnt);
else if (evnt->flag_q_none)
evnt_del(&q_none, evnt);
else
ASSERT_NOT_REACHED();
}
static void EVNT__ATTR_USED() evnt__add_whatever(struct Evnt *evnt)
{
if (0) { }
else if (evnt->flag_q_accept)
evnt_add(&q_accept, evnt);
else if (evnt->flag_q_connect)
evnt_add(&q_connect, evnt);
else if (evnt->flag_q_recv)
evnt_add(&q_recv, evnt);
else if (evnt->flag_q_send_recv)
evnt_add(&q_send_recv, evnt);
else if (evnt->flag_q_none)
evnt_add(&q_none, evnt);
else
ASSERT_NOT_REACHED();
}
static unsigned int evnt__debug_num_1(struct Evnt *scan)
{
unsigned int num = 0;
while (scan)
{
struct Evnt *scan_next = scan->next;
++num;
scan = scan_next;
}
return (num);
}
#ifndef VSTR_AUTOCONF_NDEBUG
static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt)
{
struct Evnt **ret = que;
while (*ret)
{
if (*ret == evnt)
return (ret);
ret = &(*ret)->next;
}
return (NULL);
}
static int evnt__valid(struct Evnt *evnt)
{
int ret = 0;
ASSERT(evnt_num_all());
ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
evnt->flag_q_send_recv + evnt->flag_q_none) == 1);
if (evnt->flag_q_send_now)
{
struct Evnt **scan = &q_send_now;
while (*scan && (*scan != evnt))
scan = &(*scan)->s_next;
ASSERT(*scan);
}
else
{
struct Evnt **scan = &q_send_now;
while (*scan && (*scan != evnt))
scan = &(*scan)->s_next;
ASSERT(!*scan);
}
if (evnt->flag_q_closed)
{
struct Evnt **scan = &q_closed;
while (*scan && (*scan != evnt))
scan = &(*scan)->c_next;
ASSERT(*scan);
}
else
{
struct Evnt **scan = &q_closed;
while (*scan && (*scan != evnt))
scan = &(*scan)->c_next;
ASSERT(!*scan);
}
if (0) { }
else if (evnt->flag_q_accept)
{
ret = !!evnt__srch(&q_accept, evnt);
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
ASSERT(!evnt->io_r && !evnt->io_w);
}
else if (evnt->flag_q_connect)
{
ret = !!evnt__srch(&q_connect, evnt);
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
}
else if (evnt->flag_q_send_recv)
{
ret = !!evnt__srch(&q_send_recv, evnt);
assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
}
else if (evnt->flag_q_recv)
{
ret = !!evnt__srch(&q_recv, evnt);
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
}
else if (evnt->flag_q_none)
{
ret = !!evnt__srch(&q_none, evnt);
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
}
else
ASSERT_NOT_REACHED();
return (ret);
}
static unsigned int evnt__debug_num_all(void)
{
unsigned int num = 0;
num += evnt__debug_num_1(q_connect);
num += evnt__debug_num_1(q_accept);
num += evnt__debug_num_1(q_recv);
num += evnt__debug_num_1(q_send_recv);
num += evnt__debug_num_1(q_none);
return (num);
}
#endif
int evnt_fd(struct Evnt *evnt)
{
ASSERT(evnt__valid(evnt));
return (SOCKET_POLL_INDICATOR(evnt->ind)->fd);
}
int evnt_cb_func_connect(struct Evnt *EVNT__ATTR_UNUSED(evnt))
{
return (TRUE);
}
struct Evnt *evnt_cb_func_accept(struct Evnt *EVNT__ATTR_UNUSED(evnt),
int EVNT__ATTR_UNUSED(fd),
struct sockaddr *EVNT__ATTR_UNUSED(sa),
socklen_t EVNT__ATTR_UNUSED(len))
{
return (NULL);
}
int evnt_cb_func_recv(struct Evnt *evnt)
{
unsigned int ern = 0;
int ret = evnt_recv(evnt, &ern);
if (ret)
return (TRUE);
if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len)
return (evnt_shutdown_r(evnt, TRUE));
return (FALSE);
}
int evnt_cb_func_send(struct Evnt *evnt)
{
int ret = -1;
evnt_fd_set_cork(evnt, TRUE);
ret = evnt_send(evnt);
if (!evnt->io_w->len)
evnt_fd_set_cork(evnt, FALSE);
return (ret);
}
void evnt_cb_func_free(struct Evnt *evnt)
{
MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt));
free(evnt);
}
void evnt_cb_func_F(struct Evnt *evnt)
{
F(evnt);
}
int evnt_cb_func_shutdown_r(struct Evnt *evnt)
{
vlg_dbg2(vlg, "SHUTDOWN CB from[$<sa:%p>]\n", EVNT_SA(evnt));
if (!evnt_shutdown_r(evnt, FALSE))
return (FALSE);
return (!!evnt->io_w->len);
}
static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref)
{
ASSERT(ref);
evnt->flag_q_accept = FALSE;
evnt->flag_q_connect = FALSE;
evnt->flag_q_recv = FALSE;
evnt->flag_q_send_recv = FALSE;
evnt->flag_q_none = FALSE;
evnt->flag_q_send_now = FALSE;
evnt->flag_q_closed = FALSE;
evnt->flag_q_pkt_move = FALSE;
evnt->flag_io_nagle = evnt_opt_nagle;
evnt->flag_io_cork = FALSE;
evnt->flag_io_filter = FALSE;
evnt->flag_fully_acpt = FALSE;
evnt->io_r_shutdown = FALSE;
evnt->io_w_shutdown = FALSE;
evnt->prev_bytes_r = 0;
evnt->acct.req_put = 0;
evnt->acct.req_got = 0;
evnt->acct.bytes_r = 0;
evnt->acct.bytes_w = 0;
evnt->cbs->cb_func_accept = evnt_cb_func_accept;
evnt->cbs->cb_func_connect = evnt_cb_func_connect;
evnt->cbs->cb_func_recv = evnt_cb_func_recv;
evnt->cbs->cb_func_send = evnt_cb_func_send;
evnt->cbs->cb_func_free = evnt_cb_func_F;
evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r;
if (!(evnt->io_r = vstr_make_base(NULL)))
goto make_vstr_fail;
if (!(evnt->io_w = vstr_make_base(NULL)))
goto make_vstr_fail;
evnt->tm_o = NULL;
if (!(evnt->ind = evnt_poll_add(evnt, fd)))
goto poll_add_fail;
EVNT__UPDATE_TV();
EVNT__COPY_TV(&evnt->ctime);
EVNT__COPY_TV(&evnt->mtime);
evnt->msecs_tm_mtime = 0;
if (fcntl(fd, F_SETFD, TRUE) == -1)
goto fcntl_fail;
evnt->sa_ref = vstr_ref_add(ref);
evnt_fd__set_nonblock(fd, TRUE);
return (TRUE);
fcntl_fail:
evnt_poll_del(evnt);
poll_add_fail:
vstr_free_base(evnt->io_w);
make_vstr_fail:
vstr_free_base(evnt->io_r);
errno = ENOMEM;
return (FALSE);
}
static void evnt__free1(struct Evnt *evnt)
{
evnt_send_del(evnt);
if (evnt->io_r && evnt->io_r->len)
vlg_dbg2(vlg, "evnt__free1(%p) io_r len = %zu\n", evnt, evnt->io_r->len);
if (evnt->io_w && evnt->io_w->len)
vlg_dbg2(vlg, "evnt__free1(%p) io_w len = %zu\n", evnt, evnt->io_w->len);
vstr_free_base(evnt->io_w); evnt->io_w = NULL;
vstr_free_base(evnt->io_r); evnt->io_r = NULL;
evnt_poll_del(evnt);
}
static void evnt__free2(Vstr_ref *sa, Timer_q_node *tm_o)
{
vstr_ref_del(sa);
if (tm_o)
{
timer_q_cntl_node(tm_o, TIMER_Q_CNTL_NODE_SET_DATA, NULL);
timer_q_quick_del_node(tm_o);
}
}
static void evnt__free(struct Evnt *evnt)
{
if (evnt)
{
Vstr_ref *sa = evnt->sa_ref;
Timer_q_node *tm_o = evnt->tm_o;
evnt__free1(evnt);
ASSERT(evnt__num >= 1);
--evnt__num;
ASSERT(evnt__num == evnt__debug_num_all());
evnt->cbs->cb_func_free(evnt);
evnt__free2(sa, tm_o);
}
}
void evnt_free(struct Evnt *evnt)
{
if (evnt)
{
evnt__del_whatever(evnt);
evnt__free(evnt);
}
}
static void evnt__uninit(struct Evnt *evnt)
{
ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
evnt->flag_q_send_recv + evnt->flag_q_none) == 0);
evnt__free1(evnt);
evnt__free2(evnt->sa_ref, evnt->tm_o);
}
static int evnt_fd__set_nodelay(int fd, int val)
{
return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1);
}
static int evnt_fd__set_reuse(int fd, int val)
{
return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1);
}
static void evnt__fd_close_noerrno(int fd)
{
int saved_errno = errno;
close(fd);
errno = saved_errno;
}
static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags)
{
evnt_add(que, evnt);
++evnt__num;
evnt_wait_cntl_add(evnt, flags);
ASSERT(evnt__valid(evnt));
return (TRUE);
}
int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port)
{
int fd = -1;
socklen_t alloc_len = sizeof(struct sockaddr_in);
Vstr_ref *ref = NULL;
if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
goto sock_fail;
if (!(ref = vstr_ref_make_malloc(alloc_len)))
goto init_fail;
if (!evnt_init(evnt, fd, ref))
goto init_fail;
evnt->flag_q_pkt_move = TRUE;
if (!evnt->flag_io_nagle)
evnt_fd__set_nodelay(fd, TRUE);
EVNT_SA_IN4(evnt)->sin_family = AF_INET;
EVNT_SA_IN4(evnt)->sin_port = htons(port);
EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(ipv4_string);
ASSERT(port && (EVNT_SA_IN4(evnt)->sin_addr.s_addr != htonl(INADDR_ANY)));
if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
{
if (errno == EINPROGRESS)
{
vstr_ref_del(ref);
evnt->flag_q_connect = TRUE;
return (evnt__make_end(&q_connect, evnt, POLLOUT));
}
goto connect_fail;
}
vstr_ref_del(ref);
evnt->flag_q_none = TRUE;
return (evnt__make_end(&q_none, evnt, 0));
connect_fail:
evnt__uninit(evnt);
init_fail:
vstr_ref_del(ref);
evnt__fd_close_noerrno(fd);
sock_fail:
return (FALSE);
}
int evnt_make_con_local(struct Evnt *evnt, const char *fname)
{
int fd = -1;
size_t len = strlen(fname) + 1;
struct sockaddr_un tmp_sun;
socklen_t alloc_len = 0;
Vstr_ref *ref = NULL;
tmp_sun.sun_path[0] = 0;
alloc_len = SUN_LEN(&tmp_sun) + len;
if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
goto sock_fail;
if (!(ref = vstr_ref_make_malloc(alloc_len)))
goto init_fail;
if (!evnt_init(evnt, fd, ref))
goto init_fail;
evnt->flag_q_pkt_move = TRUE;
EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
{
if (errno == EINPROGRESS)
{
vstr_ref_del(ref);
evnt->flag_q_connect = TRUE;
return (evnt__make_end(&q_connect, evnt, POLLOUT));
}
goto connect_fail;
}
vstr_ref_del(ref);
evnt->flag_q_none = TRUE;
return (evnt__make_end(&q_none, evnt, 0));
connect_fail:
evnt__uninit(evnt);
init_fail:
vstr_ref_del(ref);
evnt__fd_close_noerrno(fd);
sock_fail:
return (FALSE);
}
int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa)
{
if (!evnt_init(evnt, fd, sa))
return (FALSE);
if (!evnt->flag_io_nagle)
evnt_fd__set_nodelay(fd, TRUE);
evnt->flag_q_recv = TRUE;
return (evnt__make_end(&q_recv, evnt, POLLIN));
}
int evnt_make_acpt_dup(struct Evnt *evnt, int fd,
struct sockaddr *sa, socklen_t len)
{
Vstr_ref *ref = vstr_ref_make_memdup(sa, len);
int ret = FALSE;
if (!ref)
{
errno = ENOMEM;
return (FALSE);
}
ret = evnt_make_acpt_ref(evnt, fd, ref);
vstr_ref_del(ref);
return (ret);
}
static int evnt__make_bind_end(struct Evnt *evnt)
{
vstr_free_base(evnt->io_r); evnt->io_r = NULL;
vstr_free_base(evnt->io_w); evnt->io_w = NULL;
evnt->flag_q_accept = TRUE;
evnt__make_end(&q_accept, evnt, POLLIN);
SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0;
return (TRUE);
}
int evnt_make_bind_ipv4(struct Evnt *evnt,
const char *acpt_addr, short server_port,
unsigned int listen_len)
{
int fd = -1;
int saved_errno = 0;
socklen_t alloc_len = sizeof(struct sockaddr_in);
Vstr_ref *ref = NULL;
if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
goto sock_fail;
if (!(ref = vstr_ref_make_malloc(alloc_len)))
goto init_fail;
if (!evnt_init(evnt, fd, ref))
goto init_fail;
EVNT_SA_IN4(evnt)->sin_family = AF_INET;
EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY);
if (acpt_addr && *acpt_addr)
EVNT__RESOLVE_NAME(evnt, acpt_addr);
if (EVNT_SA_IN4(evnt)->sin_addr.s_addr == htonl(INADDR_ANY))
acpt_addr = "any";
EVNT_SA_IN4(evnt)->sin_port = htons(server_port);
if (!evnt_fd__set_reuse(fd, TRUE))
goto reuse_fail;
if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
goto bind_fail;
if (!server_port)
if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1)
vlg_err(vlg, EXIT_FAILURE, "getsockname: %m\n");
if (listen(fd, listen_len) == -1)
goto listen_fail;
vstr_ref_del(ref);
return (evnt__make_bind_end(evnt));
bind_fail:
saved_errno = errno;
vlg_warn(vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port);
errno = saved_errno;
listen_fail:
reuse_fail:
evnt__uninit(evnt);
init_fail:
vstr_ref_del(ref);
evnt__fd_close_noerrno(fd);
sock_fail:
return (FALSE);
}
int evnt_make_bind_local(struct Evnt *evnt, const char *fname,
unsigned int listen_len)
{
int fd = -1;
int saved_errno = 0;
size_t len = strlen(fname) + 1;
struct sockaddr_un tmp_sun;
socklen_t alloc_len = 0;
Vstr_ref *ref = NULL;
tmp_sun.sun_path[0] = 0;
alloc_len = SUN_LEN(&tmp_sun) + len;
if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
goto sock_fail;
if (!(ref = vstr_ref_make_malloc(alloc_len)))
goto init_fail;
if (!evnt_init(evnt, fd, ref))
goto init_fail;
EVNT_SA_UN(evnt)->sun_family = AF_LOCAL;
memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len);
unlink(fname);
if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
goto bind_fail;
if (fchmod(fd, 0600) == -1)
goto fchmod_fail;
if (listen(fd, listen_len) == -1)
goto listen_fail;
vstr_ref_del(ref);
return (evnt__make_bind_end(evnt));
bind_fail:
saved_errno = errno;
vlg_warn(vlg, "bind(%s): %m\n", fname);
errno = saved_errno;
fchmod_fail:
listen_fail:
evnt__uninit(evnt);
init_fail:
vstr_ref_del(ref);
evnt__fd_close_noerrno(fd);
sock_fail:
return (FALSE);
}
int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags)
{
static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1};
if (!sa)
sa = &dummy_sa;
if (!evnt_init(evnt, fd, sa))
{
evnt__fd_close_noerrno(fd);
return (FALSE);
}
if (flags & POLLIN)
{
evnt->flag_q_recv = TRUE;
return (evnt__make_end(&q_recv, evnt, POLLIN));
}
evnt->flag_q_none = TRUE;
return (evnt__make_end(&q_none, evnt, 0));
}
void evnt_close(struct Evnt *evnt)
{
if (!evnt)
return;
ASSERT(evnt__valid(evnt));
if (evnt->flag_q_closed)
return;
evnt->flag_q_closed = TRUE;
evnt->c_next = q_closed;
q_closed = evnt;
ASSERT(evnt__valid(evnt));
}
void evnt_put_pkt(struct Evnt *evnt)
{
ASSERT(evnt__valid(evnt));
if (evnt->flag_q_pkt_move && evnt->flag_q_none)
{
ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE;
evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
evnt_wait_cntl_add(evnt, POLLIN);
}
++evnt->acct.req_put;
ASSERT(evnt__valid(evnt));
}
void evnt_got_pkt(struct Evnt *evnt)
{
ASSERT(evnt__valid(evnt));
++evnt->acct.req_got;
if (evnt->flag_q_pkt_move &&
!evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got))
{
ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE;
evnt_wait_cntl_del(evnt, POLLIN);
}
ASSERT(evnt__valid(evnt));
}
static int evnt__call_send(struct Evnt *evnt, unsigned int *ern)
{
size_t tmp = evnt->io_w->len;
int fd = evnt_fd(evnt);
if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN))
return (FALSE);
tmp -= evnt->io_w->len;
vlg_dbg3(vlg, "write(%p) = %zu\n", evnt, tmp);
evnt->acct.bytes_w += tmp;
return (TRUE);
}
int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz)
{
ASSERT(evnt__valid(evnt));
vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n",
evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q);
if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz))
{
if (!evnt__call_send(evnt, NULL))
{
ASSERT(evnt__valid(evnt));
return (FALSE);
}
if (!evnt->io_w->len && !force_q)
{
ASSERT(evnt__valid(evnt));
return (TRUE);
}
}
if (evnt->flag_q_send_now || (evnt->flag_q_send_recv && !force_q))
{
ASSERT(evnt__valid(evnt));
return (TRUE);
}
evnt->s_next = q_send_now;
q_send_now = evnt;
evnt->flag_q_send_now = TRUE;
ASSERT(evnt__valid(evnt));
return (TRUE);
}
void evnt_send_del(struct Evnt *evnt)
{
struct Evnt **scan = &q_send_now;
if (!evnt->flag_q_send_now)
return;
while (*scan && (*scan != evnt))
scan = &(*scan)->s_next;
ASSERT(*scan);
*scan = evnt->s_next;
evnt->flag_q_send_now = FALSE;
}
int evnt_shutdown_r(struct Evnt *evnt, int got_eof)
{
ASSERT(evnt__valid(evnt));
if (evnt->io_r_shutdown || evnt->io_w_shutdown)
return (FALSE);
evnt_wait_cntl_del(evnt, POLLIN);
vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$<sa:%p>]\n",
got_eof, EVNT_SA(evnt));
if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1))
{
if (errno != ENOTCONN)
vlg_warn(vlg, "shutdown(SHUT_RD): %m\n");
return (FALSE);
}
evnt->io_r_shutdown = TRUE;
ASSERT(evnt__valid(evnt));
return (TRUE);
}
static void evnt__send_fin(struct Evnt *evnt)
{
if (0)
{ }
else if ( evnt->flag_q_send_recv && !evnt->io_w->len)
{
evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE;
if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got))
{
evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE;
evnt_wait_cntl_del(evnt, POLLIN | POLLOUT);
}
else
{
evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
evnt_wait_cntl_del(evnt, POLLOUT);
}
}
else if (!evnt->flag_q_send_recv && evnt->io_w->len)
{
int pflags = POLLOUT;
ASSERT(evnt->flag_q_none || evnt->flag_q_recv);
if (evnt->flag_q_none)
evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE;
else
evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE;
if (!evnt->io_r_shutdown) pflags |= POLLIN;
evnt_wait_cntl_add(evnt, pflags);
}
}
int evnt_shutdown_w(struct Evnt *evnt)
{
Vstr_base *out = evnt->io_w;
ASSERT(evnt__valid(evnt));
vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$<sa:%p>]\n", EVNT_SA(evnt));
if (evnt->io_r_shutdown || evnt->io_w_shutdown)
return (FALSE);
if (shutdown(evnt_fd(evnt), SHUT_WR) == -1)
{
if (errno != ENOTCONN)
vlg_warn(vlg, "shutdown(SHUT_WR): %m\n");
return (FALSE);
}
evnt->io_w_shutdown = TRUE;
vstr_del(out, 1, out->len);
evnt__send_fin(evnt);
ASSERT(evnt__valid(evnt));
return (TRUE);
}
int evnt_recv(struct Evnt *evnt, unsigned int *ern)
{
Vstr_base *data = evnt->io_r;
size_t tmp = evnt->io_r->len;
unsigned int num_min = 2;
unsigned int num_max = 6;
ASSERT(evnt__valid(evnt) && ern);
if (evnt->prev_bytes_r > (120 * 3))
num_max = 8;
if (evnt->prev_bytes_r > (120 * 6))
num_max = 32;
vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern);
evnt->prev_bytes_r = (evnt->io_r->len - tmp);
vlg_dbg3(vlg, "read(%p) = %ju\n", evnt, evnt->prev_bytes_r);
evnt->acct.bytes_r += evnt->prev_bytes_r;
switch (*ern)
{
case VSTR_TYPE_SC_READ_FD_ERR_NONE:
if (evnt->io_w_shutdown)
vstr_del(data, 1, data->len);
else
EVNT__COPY_TV(&evnt->mtime);
return (TRUE);
case VSTR_TYPE_SC_READ_FD_ERR_MEM:
vlg_warn(vlg, "%s\n", __func__);
break;
case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO:
if (errno == EAGAIN)
return (TRUE);
break;
case VSTR_TYPE_SC_READ_FD_ERR_EOF:
break;
default:
vlg_warn(vlg, "read_iov() = %d: %m\n", *ern);
}
return (FALSE);
}
int evnt_send(struct Evnt *evnt)
{
unsigned int ern = 0;
ASSERT(evnt__valid(evnt));
if (!evnt__call_send(evnt, &ern))
return (FALSE);
EVNT__COPY_TV(&evnt->mtime);
evnt__send_fin(evnt);
ASSERT(evnt__valid(evnt));
return (TRUE);
}
#ifdef VSTR_AUTOCONF_lseek64
# define sendfile64 sendfile
#endif
int evnt_sendfile(struct Evnt *evnt, int ffd, VSTR_AUTOCONF_uintmax_t *f_off,
VSTR_AUTOCONF_uintmax_t *f_len, unsigned int *ern)
{
ssize_t ret = 0;
off64_t tmp_off = *f_off;
size_t tmp_len = *f_len;
*ern = 0;
ASSERT(evnt__valid(evnt));
ASSERT(!evnt->io_w->len);
if (*f_len > SSIZE_MAX)
tmp_len = SSIZE_MAX;
if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1)
{
if (errno == EAGAIN)
return (TRUE);
*ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO;
return (FALSE);
}
if (!ret)
{
*ern = VSTR_TYPE_SC_READ_FD_ERR_EOF;
return (FALSE);
}
*f_off = tmp_off;
evnt->acct.bytes_w += ret;
EVNT__COPY_TV(&evnt->mtime);
*f_len -= ret;
return (TRUE);
}
int evnt_sc_read_send(struct Evnt *evnt, int fd, VSTR_AUTOCONF_uintmax_t *len)
{
Vstr_base *out = evnt->io_w;
size_t orig_len = out->len;
size_t tmp = 0;
int ret = IO_OK;
ASSERT(len && *len);
if ((ret = io_get(out, fd)) == IO_FAIL)
return (EVNT_IO_READ_ERR);
if (ret == IO_EOF)
return (EVNT_IO_READ_EOF);
tmp = out->len - orig_len;
if (tmp >= *len)
{
vstr_sc_reduce(out, 1, out->len, tmp - *len);
ASSERT((out->len - orig_len) == *len);
*len = 0;
return (EVNT_IO_READ_FIN);
}
*len -= tmp;
if (!evnt_send(evnt))
return (EVNT_IO_SEND_ERR);
return (EVNT_IO_OK);
}
static int evnt__get_timeout(void)
{
const struct timeval *tv = NULL;
int msecs = -1;
if (q_send_now)
msecs = 0;
else if ((tv = timer_q_first_timeval()))
{
long diff = 0;
struct timeval now_timeval;
EVNT__COPY_TV(&now_timeval);
diff = timer_q_timeval_diff_msecs(tv, &now_timeval);
if (diff > 0)
{
if (diff >= INT_MAX)
msecs = INT_MAX - 1;
else
msecs = diff;
}
else
msecs = 0;
}
vlg_dbg2(vlg, "get_timeout = %d\n", msecs);
return (msecs);
}
void evnt_scan_q_close(void)
{
struct Evnt *scan = NULL;
while ((scan = q_closed))
{
scan->flag_q_closed = FALSE;
q_closed = scan->c_next;
evnt_free(scan);
}
ASSERT(!q_closed);
}
static void evnt__close_now(struct Evnt *evnt)
{
if (evnt->flag_q_closed)
return;
evnt_free(evnt);
}
void evnt_acpt_close_all(void)
{
struct Evnt *evnt = q_accept;
while (evnt)
{
evnt_close(evnt);
evnt = evnt->next;
}
}
void evnt_scan_fds(unsigned int ready, size_t max_sz)
{
const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL);
struct Evnt *scan = NULL;
EVNT__UPDATE_TV();
scan = q_connect;
while (scan && ready)
{
struct Evnt *scan_next = scan->next;
int done = FALSE;
ASSERT(evnt__valid(scan));
if (scan->flag_q_closed)
goto next_connect;
assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN));
if (SOCKET_POLL_INDICATOR(scan->ind)->revents & (POLLOUT|bad_poll_flags))
{
int ern = 0;
socklen_t len = sizeof(int);
int ret = 0;
done = TRUE;
ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len);
if (ret == -1)
vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n");
else if (ern)
{
errno = ern;
vlg_warn(vlg, "connect(): %m\n");
evnt__close_now(scan);
}
else
{
SOCKET_POLL_INDICATOR(scan->ind)->events = 0;
SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE;
evnt_add(&q_none, scan); scan->flag_q_none = TRUE;
evnt_wait_cntl_del(scan, POLLOUT);
if (!scan->cbs->cb_func_connect(scan))
evnt__close_now(scan);
}
goto next_connect;
}
ASSERT(!done);
if (evnt_poll_direct_enabled()) break;
next_connect:
SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
if (done)
--ready;
scan = scan_next;
}
scan = q_accept;
while (scan && ready)
{
struct Evnt *scan_next = scan->next;
int done = FALSE;
ASSERT(evnt__valid(scan));
if (scan->flag_q_closed)
goto next_accept;
assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT));
if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
{
done = TRUE;
evnt__close_now(scan);
goto next_accept;
}
if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
{
struct sockaddr_in sa;
socklen_t len = sizeof(struct sockaddr_in);
int fd = -1;
struct Evnt *tmp = NULL;
unsigned int acpt_num = 0;
done = TRUE;
while ((SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) &&
(fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len)) != -1)
{
if (!(tmp = scan->cbs->cb_func_accept(scan, fd,
(struct sockaddr *) &sa, len)))
{
close(fd);
goto next_accept;
}
if (!tmp->flag_q_closed)
{
++ready;
tmp->flag_fully_acpt = TRUE;
}
assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
assert(tmp == q_recv);
if (++acpt_num >= evnt__accept_limit)
break;
}
goto next_accept;
}
ASSERT(!done);
if (evnt_poll_direct_enabled()) break;
next_accept:
SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
if (done)
--ready;
scan = scan_next;
}
scan = q_recv;
while (scan && ready)
{
struct Evnt *scan_next = scan->next;
int done = FALSE;
ASSERT(evnt__valid(scan));
if (scan->flag_q_closed)
goto next_recv;
if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
{
done = TRUE;
if (!scan->cbs->cb_func_recv(scan))
evnt__close_now(scan);
}
if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
{
done = TRUE;
if (scan->io_r_shutdown || scan->io_w_shutdown ||
!scan->cbs->cb_func_shutdown_r(scan))
evnt__close_now(scan);
}
if (!done && evnt_poll_direct_enabled()) break;
next_recv:
if (done)
--ready;
scan = scan_next;
}
scan = q_send_recv;
while (scan && ready)
{
struct Evnt *scan_next = scan->next;
int done = FALSE;
ASSERT(evnt__valid(scan));
if (scan->flag_q_closed)
goto next_send;
if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)
{
done = TRUE;
if (!scan->cbs->cb_func_recv(scan))
{
evnt__close_now(scan);
goto next_send;
}
}
if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT)
{
done = TRUE;
if (!evnt_send_add(scan, TRUE, max_sz))
evnt__close_now(scan);
}
if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags))
{
done = TRUE;
if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan))
evnt__close_now(scan);
}
if (!done && evnt_poll_direct_enabled()) break;
next_send:
if (done)
--ready;
scan = scan_next;
}
scan = q_none;
while (scan && ready)
{
struct Evnt *scan_next = scan->next;
int done = FALSE;
ASSERT(evnt__valid(scan));
if (scan->flag_q_closed)
goto next_none;
if (SOCKET_POLL_INDICATOR(scan->ind)->revents)
{
done = TRUE;
evnt__close_now(scan);
goto next_none;
}
else
assert(!SOCKET_POLL_INDICATOR(scan->ind)->revents);
ASSERT(!done);
if (evnt_poll_direct_enabled()) break;
next_none:
if (done)
--ready;
scan = scan_next;
}
if (q_closed)
evnt_scan_q_close();
else if (ready)
vlg_abort(vlg, "ready = %d\n", ready);
}
void evnt_scan_send_fds(void)
{
struct Evnt **scan = NULL;
evnt_scan_q_close();
scan = &q_send_now;
while (*scan)
{
struct Evnt *tmp = *scan;
tmp->flag_q_send_now = FALSE;
*scan = tmp->s_next;
if (!tmp->cbs->cb_func_send(tmp))
{
evnt__close_now(tmp);
continue;
}
if (tmp == *scan)
{
ASSERT(tmp->flag_q_send_now == TRUE);
scan = &tmp->s_next;
}
}
evnt_scan_q_close();
}
static void evnt__close_1(struct Evnt **root)
{
struct Evnt *scan = *root;
*root = NULL;
while (scan)
{
struct Evnt *scan_next = scan->next;
Vstr_ref *sa = scan->sa_ref;
Timer_q_node *tm_o = scan->tm_o;
vstr_free_base(scan->io_w); scan->io_w = NULL;
vstr_free_base(scan->io_r); scan->io_r = NULL;
evnt_poll_del(scan);
--evnt__num;
evnt__free2(sa, tm_o);
scan->cbs->cb_func_free(scan);
scan = scan_next;
}
}
void evnt_close_all(void)
{
q_send_now = NULL;
q_closed = NULL;
evnt__close_1(&q_connect);
evnt__close_1(&q_accept);
evnt__close_1(&q_recv);
evnt__close_1(&q_send_recv);
evnt__close_1(&q_none);
ASSERT(evnt__num == evnt__debug_num_all());
}
void evnt_out_dbg3(const char *prefix)
{
if (vlg->out_dbg < 3)
return;
vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n",
prefix, evnt_num_all(),
evnt__debug_num_1(q_connect),
evnt__debug_num_1(q_accept),
evnt__debug_num_1(q_recv),
evnt__debug_num_1(q_send_recv),
evnt__debug_num_1(q_none),
evnt__debug_num_1(q_send_now));
}
void evnt_stats_add(struct Evnt *dst, const struct Evnt *src)
{
dst->acct.req_put += src->acct.req_put;
dst->acct.req_put += src->acct.req_got;
dst->acct.bytes_r += src->acct.bytes_r;
dst->acct.bytes_w += src->acct.bytes_w;
}
unsigned int evnt_num_all(void)
{
ASSERT(evnt__num == evnt__debug_num_all());
return (evnt__num);
}
int evnt_waiting(void)
{
return (q_connect || q_accept || q_recv || q_send_recv || q_send_now);
}
struct Evnt *evnt_find_least_used(void)
{
struct Evnt *con = NULL;
struct Evnt *con_min = NULL;
if (!(con = q_none) &&
!(con = q_recv) &&
!(con = q_send_recv))
return (NULL);
con_min = con;
while (con)
{
if (con_min->io_w->len > con->io_w->len)
con_min = con;
con = con->next;
}
return (con_min);
}
#define MATCH_Q_NAME(x) \
if (CSTREQ(qname, #x )) \
return ( q_ ## x ) \
struct Evnt *evnt_queue(const char *qname)
{
MATCH_Q_NAME(connect);
MATCH_Q_NAME(accept);
MATCH_Q_NAME(send_recv);
MATCH_Q_NAME(recv);
MATCH_Q_NAME(none);
MATCH_Q_NAME(send_now);
return (NULL);
}
#ifdef VSTR_AUTOCONF_HAVE_TCP_CORK
# define USE_TCP_CORK 1
#else
# define USE_TCP_CORK 0
# define TCP_CORK 0
#endif
void evnt_fd_set_cork(struct Evnt *evnt, int val)
{
ASSERT(evnt__valid(evnt));
if (!USE_TCP_CORK)
return;
if (!evnt->flag_io_cork == !val)
return;
if (!evnt->flag_io_nagle)
{
evnt_fd__set_nodelay(evnt_fd(evnt), FALSE);
evnt->flag_io_nagle = TRUE;
}
if (setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) == -1)
return;
evnt->flag_io_cork = !!val;
}
static void evnt__free_base_noerrno(Vstr_base *s1)
{
int saved_errno = errno;
vstr_free_base(s1);
errno = saved_errno;
}
int evnt_fd_set_filter(struct Evnt *evnt, const char *fname)
{
int fd = evnt_fd(evnt);
Vstr_base *s1 = NULL;
unsigned int ern = 0;
if (!CONF_USE_SOCKET_FILTERS)
return (TRUE);
if (!(s1 = vstr_make_base(NULL)))
VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n"));
vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern);
if (ern &&
((ern != VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) || (errno != ENOENT)))
{
evnt__free_base_noerrno(s1);
VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname));
}
else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX)
{
vstr_free_base(s1);
errno = E2BIG;
VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname));
}
else if (!s1->len)
{
vstr_free_base(s1);
if (!evnt->flag_io_filter)
vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname);
else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1)
{
evnt__free_base_noerrno(s1);
VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n",
fname));
}
evnt->flag_io_filter = FALSE;
return (TRUE);
}
else
{
struct sock_fprog filter[1];
socklen_t len = sizeof(filter);
filter->len = s1->len / sizeof(struct sock_filter);
filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len);
if (!filter->filter)
VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n"));
if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1)
{
evnt__free_base_noerrno(s1);
VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n",
fname));
}
}
evnt->flag_io_filter = TRUE;
vstr_free_base(s1);
return (TRUE);
}
static Timer_q_base *evnt__timeout_1 = NULL;
static Timer_q_base *evnt__timeout_10 = NULL;
static Timer_q_base *evnt__timeout_100 = NULL;
static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt,
struct timeval *tv,
unsigned long msecs)
{
Timer_q_node *tm_o = NULL;
vlg_dbg2(vlg, "mtime_make(%p, %lu)\n", evnt, msecs);
if (0) { }
else if (msecs >= ( 99 * 1000))
{
TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0);
tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv,
TIMER_Q_FLAG_NODE_DEFAULT);
}
else if (msecs >= ( 9 * 1000))
{
TIMER_Q_TIMEVAL_ADD_SECS(tv, 10, 0);
tm_o = timer_q_add_node(evnt__timeout_10, evnt, tv,
TIMER_Q_FLAG_NODE_DEFAULT);
}
else
{
TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
tm_o = timer_q_add_node(evnt__timeout_1, evnt, tv,
TIMER_Q_FLAG_NODE_DEFAULT);
}
return (tm_o);
}
static void evnt__timer_cb_mtime(int type, void *data)
{
struct Evnt *evnt = data;
struct timeval tv[1];
unsigned long diff = 0;
if (!evnt)
return;
ASSERT(evnt__valid(evnt));
if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
return;
evnt->tm_o = NULL;
if (type == TIMER_Q_TYPE_CALL_DEL)
return;
EVNT__COPY_TV(tv);
diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime);
if (diff < evnt->msecs_tm_mtime)
diff = evnt->msecs_tm_mtime - diff;
else
{
vlg_dbg2(vlg, "timeout = %p[$<sa:%p>] (%lu, %lu)\n",
evnt, EVNT_SA(evnt), diff, evnt->msecs_tm_mtime);
if (!evnt_shutdown_w(evnt))
{
evnt_close(evnt);
return;
}
EVNT__COPY_TV(&evnt->mtime);
diff = evnt->msecs_tm_mtime;
}
if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff)))
{
errno = ENOMEM;
vlg_warn(vlg, "%s: %m\n", "timer reinit");
evnt_close(evnt);
}
}
void evnt_timeout_init(void)
{
ASSERT(!evnt__timeout_1);
EVNT__UPDATE_TV();
evnt__timeout_1 = timer_q_add_base(evnt__timer_cb_mtime,
TIMER_Q_FLAG_BASE_INSERT_FROM_END);
evnt__timeout_10 = timer_q_add_base(evnt__timer_cb_mtime,
TIMER_Q_FLAG_BASE_INSERT_FROM_END);
evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime,
TIMER_Q_FLAG_BASE_INSERT_FROM_END);
if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100)
VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init"));
timer_q_cntl_base(evnt__timeout_1,
TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
timer_q_cntl_base(evnt__timeout_10,
TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
timer_q_cntl_base(evnt__timeout_100,
TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE);
}
void evnt_timeout_exit(void)
{
ASSERT(evnt__timeout_1);
timer_q_del_base(evnt__timeout_1); evnt__timeout_1 = NULL;
timer_q_del_base(evnt__timeout_10); evnt__timeout_10 = NULL;
timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL;
}
int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs)
{
struct timeval tv[1];
evnt->msecs_tm_mtime = msecs;
EVNT__COPY_TV(tv);
if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs)))
return (FALSE);
return (TRUE);
}
void evnt_sc_main_loop(size_t max_sz)
{
int ready = 0;
struct timeval tv[1];
ready = evnt_poll();
if ((ready == -1) && (errno != EINTR))
vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll");
if (ready == -1)
return;
evnt_out_dbg3("1");
evnt_scan_fds(ready, max_sz);
evnt_out_dbg3("2");
evnt_scan_send_fds();
evnt_out_dbg3("3");
EVNT__COPY_TV(tv);
timer_q_run_norm(tv);
evnt_out_dbg3("4");
evnt_scan_send_fds();
evnt_out_dbg3("5");
}
time_t evnt_sc_time(void)
{
time_t ret = -1;
if (!CONF_GETTIMEOFDAY_TIME)
ret = time(NULL);
else
{
struct timeval tv[1];
EVNT__COPY_TV(tv);
ret = tv->tv_sec;
}
return (ret);
}
void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt)
{
struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt;
struct Acpt_data *acpt_data = acpt_listener->ref->ptr;
evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE");
acpt_data->evnt = NULL;
vstr_ref_del(acpt_listener->ref);
F(acpt_listener);
}
static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref)
{
struct Acpt_data *ptr = NULL;
if (!ref)
return;
ptr = ref->ptr;
vstr_ref_del(ptr->sa);
F(ptr);
free(ref);
}
struct Evnt *evnt_sc_serv_make_bind(const char *acpt_addr,
unsigned short acpt_port,
unsigned int q_listen_len,
unsigned int max_connections,
unsigned int defer_accept,
const char *acpt_filter_file)
{
struct sockaddr_in *sinv4 = NULL;
Acpt_listener *acpt_listener = NULL;
Acpt_data *acpt_data = NULL;
Vstr_ref *ref = NULL;
if (!(acpt_listener = MK(sizeof(Acpt_listener))))
VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
acpt_addr, acpt_port));
acpt_listener->max_connections = max_connections;
if (!(acpt_data = MK(sizeof(Acpt_data))))
VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
acpt_addr, acpt_port));
acpt_data->evnt = NULL;
acpt_data->sa = NULL;
if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb)))
VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
acpt_addr, acpt_port));
acpt_listener->ref = ref;
if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port,
q_listen_len))
vlg_err(vlg, 2, "%s: %m\n", __func__);
acpt_data->evnt = acpt_listener->evnt;
acpt_data->sa = vstr_ref_add(acpt_data->evnt->sa_ref);
sinv4 = EVNT_SA_IN4(acpt_listener->evnt);
ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port)));
if (defer_accept)
evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept);
if (acpt_filter_file &&
!evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file))
vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file);
acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free;
return (acpt_listener->evnt);
}
void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix)
{
vlg_info(vlg, "%s from[$<sa:%p>] req_got[%'u:%u] req_put[%'u:%u]"
" recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n",
prefix, EVNT_SA(evnt),
evnt->acct.req_got, evnt->acct.req_got,
evnt->acct.req_put, evnt->acct.req_put,
evnt->acct.bytes_r, evnt->acct.bytes_r,
evnt->acct.bytes_w, evnt->acct.bytes_w);
}
#ifdef TCP_DEFER_ACCEPT
# define USE_TCP_DEFER_ACCEPT 1
#else
# define USE_TCP_DEFER_ACCEPT 0
# define TCP_DEFER_ACCEPT 0
#endif
void evnt_fd_set_defer_accept(struct Evnt *evnt, int val)
{
socklen_t len = sizeof(int);
ASSERT(evnt__valid(evnt));
if (!USE_TCP_DEFER_ACCEPT)
return;
setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
}
pid_t evnt_make_child(void)
{
pid_t ret = fork();
if (!ret)
{
evnt__is_child = TRUE;
evnt_poll_child_init();
}
return (ret);
}
int evnt_is_child(void)
{
return (evnt__is_child);
}
int evnt_child_block_beg(void)
{
if (!evnt_is_child())
return (TRUE);
if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1)
vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
if (evnt_child_exited)
return (FALSE);
return (TRUE);
}
int evnt_child_block_end(void)
{
if (!evnt_is_child())
return (TRUE);
if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1)
vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n");
return (TRUE);
}
static int evnt__poll_tst_accept(int msecs)
{
return ((msecs == -1) && (evnt_num_all() == 1) && q_accept);
}
static int evnt__poll_accept(void)
{
int fd = -1;
struct Evnt *scan = q_accept;
struct sockaddr_in sa;
socklen_t len = sizeof(struct sockaddr_in);
struct Evnt *tmp = NULL;
evnt_fd__set_nonblock(evnt_fd(scan), FALSE);
if (!evnt_child_block_beg())
goto block_beg_fail;
fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len);
evnt_child_block_end();
evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
if (fd == -1)
goto accept_fail;
if (!(tmp = scan->cbs->cb_func_accept(scan,
fd, (struct sockaddr *) &sa, len)))
goto cb_accept_fail;
if (!tmp->flag_q_closed)
tmp->flag_fully_acpt = TRUE;
assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
assert(tmp == q_recv);
return (1);
block_beg_fail:
evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
goto accept_fail;
cb_accept_fail:
close(fd);
accept_fail:
errno = EINTR;
return (-1);
}
#ifndef EVNT_USE_EPOLL
# ifdef VSTR_AUTOCONF_HAVE_SYS_EPOLL_H
# define EVNT_USE_EPOLL 1
# else
# define EVNT_USE_EPOLL 0
# endif
#endif
#if !EVNT_USE_EPOLL
int evnt_poll_init(void)
{
return (TRUE);
}
int evnt_poll_direct_enabled(void)
{
return (FALSE);
}
int evnt_poll_child_init(void)
{
return (TRUE);
}
void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
{
SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
SOCKET_POLL_INDICATOR(evnt->ind)->revents |= flags;
}
void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
{
SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
}
unsigned int evnt_poll_add(struct Evnt *EVNT__ATTR_UNUSED(evnt), int fd)
{
return (socket_poll_add(fd));
}
void evnt_poll_del(struct Evnt *evnt)
{
if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
socket_poll_del(evnt->ind);
}
int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
{
unsigned int old_ind = evnt->ind;
assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
ASSERT(evnt__valid(evnt));
if (!(evnt->ind = socket_poll_add(fd)))
goto poll_add_fail;
if (!(evnt->io_r = vstr_make_base(NULL)) ||
!(evnt->io_w = vstr_make_base(NULL)))
goto malloc_base_fail;
SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
socket_poll_del(old_ind);
evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
evnt_fd__set_nonblock(fd, TRUE);
ASSERT(evnt__valid(evnt));
return (TRUE);
malloc_base_fail:
socket_poll_del(evnt->ind);
evnt->ind = old_ind;
vstr_free_base(evnt->io_r); evnt->io_r = NULL;
ASSERT(!evnt->io_r && !evnt->io_w);
poll_add_fail:
return (FALSE);
}
int evnt_poll(void)
{
int msecs = evnt__get_timeout();
if (evnt_child_exited)
return (errno = EINTR, -1);
if (evnt__poll_tst_accept(msecs))
return (evnt__poll_accept());
return (socket_poll_update_all(msecs));
}
#else
#include <sys/epoll.h>
static int evnt__epoll_fd = -1;
int evnt_poll_init(void)
{
assert(POLLIN == EPOLLIN);
assert(POLLOUT == EPOLLOUT);
assert(POLLHUP == EPOLLHUP);
assert(POLLERR == EPOLLERR);
if (!CONF_EVNT_NO_EPOLL)
{
evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ);
vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd);
}
return (evnt__epoll_fd != -1);
}
int evnt_poll_direct_enabled(void)
{
return (evnt__epoll_fd != -1);
}
static int evnt__epoll_readd(struct Evnt *evnt)
{
struct epoll_event epevent[1];
while (evnt)
{
int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
epevent->events = flags;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1)
vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n");
evnt = evnt->next;
}
return (TRUE);
}
int evnt_poll_child_init(void)
{
if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled())
{
close(evnt__epoll_fd);
evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ);
if (evnt__epoll_fd == -1)
VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n"));
evnt__epoll_readd(q_connect);
evnt__epoll_readd(q_accept);
evnt__epoll_readd(q_recv);
evnt__epoll_readd(q_send_recv);
evnt__epoll_readd(q_none);
}
return (TRUE);
}
void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
{
if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags)
return;
SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN);
if (evnt_poll_direct_enabled())
{
struct epoll_event epevent[1];
flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags);
epevent->events = flags;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
}
}
void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
{
if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags))
return;
SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
if (flags && evnt_poll_direct_enabled())
{
struct epoll_event epevent[1];
flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
vlg_dbg2(vlg, "epoll_mod_del(%p,%d)\n", evnt, flags);
epevent->events = flags;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
}
}
unsigned int evnt_poll_add(struct Evnt *evnt, int fd)
{
unsigned int ind = socket_poll_add(fd);
if (ind && evnt_poll_direct_enabled())
{
struct epoll_event epevent[1];
int flags = 0;
vlg_dbg2(vlg, "epoll_add(%p,%d)\n", evnt, flags);
epevent->events = flags;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
{
vlg_warn(vlg, "epoll: %m\n");
socket_poll_del(fd);
}
}
return (ind);
}
void evnt_poll_del(struct Evnt *evnt)
{
if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
socket_poll_del(evnt->ind);
if (FALSE && evnt_poll_direct_enabled())
{
int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd;
struct epoll_event epevent[1];
vlg_dbg2(vlg, "epoll_del(%p,%d)\n", evnt, 0);
epevent->events = 0;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1)
vlg_abort(vlg, "epoll: %m\n");
}
}
int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
{
unsigned int old_ind = evnt->ind;
int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd;
assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
ASSERT(evnt__valid(evnt));
if (!(evnt->ind = socket_poll_add(fd)))
goto poll_add_fail;
if (!(evnt->io_r = vstr_make_base(NULL)) ||
!(evnt->io_w = vstr_make_base(NULL)))
goto malloc_base_fail;
SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
socket_poll_del(old_ind);
evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
evnt_fd__set_nonblock(fd, TRUE);
if (evnt_poll_direct_enabled())
{
struct epoll_event epevent[1];
vlg_dbg2(vlg, "epoll_swap(%p,%d,%d)\n", evnt, old_fd, fd);
epevent->events = POLLIN;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1)
vlg_abort(vlg, "epoll: %m\n");
epevent->events = SOCKET_POLL_INDICATOR(evnt->ind)->events;
epevent->data.ptr = evnt;
if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
vlg_abort(vlg, "epoll: %m\n");
}
ASSERT(evnt__valid(evnt));
return (TRUE);
malloc_base_fail:
socket_poll_del(evnt->ind);
evnt->ind = old_ind;
vstr_free_base(evnt->io_r); evnt->io_r = NULL;
ASSERT(!evnt->io_r && !evnt->io_w);
poll_add_fail:
return (FALSE);
}
#define EVNT__EPOLL_EVENTS 128
int evnt_poll(void)
{
struct epoll_event events[EVNT__EPOLL_EVENTS];
int msecs = evnt__get_timeout();
int ret = -1;
unsigned int scan = 0;
if (evnt_child_exited)
return (errno = EINTR, -1);
if (evnt__poll_tst_accept(msecs))
return (evnt__poll_accept());
if (!evnt_poll_direct_enabled())
return (socket_poll_update_all(msecs));
ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs);
if (ret == -1)
return (ret);
scan = ret;
ASSERT(scan <= EVNT__EPOLL_EVENTS);
while (scan-- > 0)
{
struct Evnt *evnt = NULL;
unsigned int flags = 0;
if (!(flags = events[scan].events))
{
vlg_dbg1(vlg, "epoll_wait_zero(%p)\n", events[scan].data.ptr);
continue;
}
evnt = events[scan].data.ptr;
ASSERT(evnt__valid(evnt));
vlg_dbg2(vlg, "epoll_wait(%p,%d)\n", evnt, flags);
vlg_dbg2(vlg, "epoll_wait[flags]=a=%d|r=%d\n",
evnt->flag_q_accept, evnt->flag_q_recv);
assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) ||
((POLLHUP|POLLERR) & flags));
SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags;
evnt__del_whatever(evnt);
evnt__add_whatever(evnt);
}
return (ret);
}
#endif