/* * Copyright (C) 2002, 2003, 2004, 2005 James Antill * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * email: james@and.org */ /* IO events, and some help with timed events */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define CONF_EVNT_NO_EPOLL FALSE #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */ #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */ #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */ #ifndef CONF_FULL_STATIC # include # define EVNT__RESOLVE_NAME(evnt, x) do { \ struct hostent *h = gethostbyname(x); \ \ if (h) \ memcpy(&EVNT_SA_IN4(evnt)->sin_addr.s_addr, \ h->h_addr_list[0], \ sizeof(EVNT_SA_IN4(evnt)->sin_addr.s_addr)); \ \ } while (FALSE) #else # define EVNT__RESOLVE_NAME(evnt, x) do { \ if ((EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \ EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY); \ } while (FALSE) #endif #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER) # define CONF_USE_SOCKET_FILTERS FALSE struct sock_fprog { int dummy; }; # define SO_DETACH_FILTER 0 # define SO_ATTACH_FILTER 0 #else # define CONF_USE_SOCKET_FILTERS TRUE /* not in glibc... hope it's not in *BSD etc. */ struct sock_filter { uint16_t code; /* Actual filter code */ uint8_t jt; /* Jump true */ uint8_t jf; /* Jump false */ uint32_t k; /* Generic multiuse field */ }; struct sock_fprog { unsigned short len; /* Number of filter blocks */ struct sock_filter *filter; }; #endif #include "vlg.h" #define EX_UTILS_NO_USE_INIT 1 #define EX_UTILS_NO_USE_EXIT 1 #define EX_UTILS_NO_USE_LIMIT 1 #define EX_UTILS_NO_USE_BLOCK 1 #define EX_UTILS_NO_USE_PUT 1 #define EX_UTILS_NO_USE_OPEN 1 #define EX_UTILS_NO_USE_IO_FD 1 #include "ex_utils.h" #include "evnt.h" #include "mk.h" #define CSTREQ(x,y) (!strcmp(x,y)) #ifdef __GNUC__ # define EVNT__ATTR_UNUSED(x) vstr__UNUSED_ ## x __attribute__((unused)) #elif defined(__LCLINT__) # define EVNT__ATTR_UNUSED(x) /*@unused@*/ vstr__UNUSED_ ## x #else # define EVNT__ATTR_UNUSED(x) vstr__UNUSED_ ## x #endif #ifdef __GNUC__ # define EVNT__ATTR_USED() __attribute__((used)) #else # define EVNT__ATTR_USED() #endif volatile sig_atomic_t evnt_child_exited = FALSE; int evnt_opt_nagle = EVNT_CONF_NAGLE; static struct Evnt *q_send_now = NULL; /* Try a send "now" */ static struct Evnt *q_closed = NULL; /* Close when fin. */ static struct Evnt *q_none = NULL; /* nothing */ static struct Evnt *q_accept = NULL; /* connections - recv */ static struct Evnt *q_connect = NULL; /* connections - send */ static struct Evnt *q_recv = NULL; /* recv */ static struct Evnt *q_send_recv = NULL; /* recv + send */ static Vlg *vlg = NULL; static unsigned int evnt__num = 0; /* this should be more configurable... */ static unsigned int evnt__accept_limit = 4; static struct timeval evnt__tv[1]; static int evnt__is_child = FALSE; #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL) #define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval)) void evnt_logger(Vlg *passed_vlg) { vlg = passed_vlg; } void evnt_fd__set_nonblock(int fd, int val) { int flags = 0; ASSERT(val == !!val); if ((flags = fcntl(fd, F_GETFL)) == -1) vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__); if (!!(flags & O_NONBLOCK) == val) return; if (val) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) == -1) vlg_err(vlg, EXIT_FAILURE, "%s\n", __func__); } void evnt_add(struct Evnt **que, struct Evnt *node) { assert(node != *que); if ((node->next = *que)) node->next->prev = node; node->prev = NULL; *que = node; } void evnt_del(struct Evnt **que, struct Evnt *node) { if (node->prev) node->prev->next = node->next; else { assert(*que == node); *que = node->next; } if (node->next) node->next->prev = node->prev; } static void evnt__del_whatever(struct Evnt *evnt) { if (0) { } else if (evnt->flag_q_accept) evnt_del(&q_accept, evnt); else if (evnt->flag_q_connect) evnt_del(&q_connect, evnt); else if (evnt->flag_q_recv) evnt_del(&q_recv, evnt); else if (evnt->flag_q_send_recv) evnt_del(&q_send_recv, evnt); else if (evnt->flag_q_none) evnt_del(&q_none, evnt); else ASSERT_NOT_REACHED(); } static void EVNT__ATTR_USED() evnt__add_whatever(struct Evnt *evnt) { if (0) { } else if (evnt->flag_q_accept) evnt_add(&q_accept, evnt); else if (evnt->flag_q_connect) evnt_add(&q_connect, evnt); else if (evnt->flag_q_recv) evnt_add(&q_recv, evnt); else if (evnt->flag_q_send_recv) evnt_add(&q_send_recv, evnt); else if (evnt->flag_q_none) evnt_add(&q_none, evnt); else ASSERT_NOT_REACHED(); } static unsigned int evnt__debug_num_1(struct Evnt *scan) { unsigned int num = 0; while (scan) { struct Evnt *scan_next = scan->next; ++num; scan = scan_next; } return (num); } #ifndef VSTR_AUTOCONF_NDEBUG static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt) { struct Evnt **ret = que; while (*ret) { if (*ret == evnt) return (ret); ret = &(*ret)->next; } return (NULL); } static int evnt__valid(struct Evnt *evnt) { int ret = 0; ASSERT(evnt_num_all()); ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv + evnt->flag_q_send_recv + evnt->flag_q_none) == 1); if (evnt->flag_q_send_now) { struct Evnt **scan = &q_send_now; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(*scan); } else { struct Evnt **scan = &q_send_now; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(!*scan); } if (evnt->flag_q_closed) { struct Evnt **scan = &q_closed; while (*scan && (*scan != evnt)) scan = &(*scan)->c_next; ASSERT(*scan); } else { struct Evnt **scan = &q_closed; while (*scan && (*scan != evnt)) scan = &(*scan)->c_next; ASSERT(!*scan); } if (0) { } else if (evnt->flag_q_accept) { ret = !!evnt__srch(&q_accept, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); ASSERT(!evnt->io_r && !evnt->io_w); } else if (evnt->flag_q_connect) { ret = !!evnt__srch(&q_connect, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN)); assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN)); } else if (evnt->flag_q_send_recv) { ret = !!evnt__srch(&q_send_recv, evnt); assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); } else if (evnt->flag_q_recv) { ret = !!evnt__srch(&q_recv, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); } else if (evnt->flag_q_none) { ret = !!evnt__srch(&q_none, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); } else ASSERT_NOT_REACHED(); return (ret); } static unsigned int evnt__debug_num_all(void) { unsigned int num = 0; num += evnt__debug_num_1(q_connect); num += evnt__debug_num_1(q_accept); num += evnt__debug_num_1(q_recv); num += evnt__debug_num_1(q_send_recv); num += evnt__debug_num_1(q_none); return (num); } #endif int evnt_fd(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); return (SOCKET_POLL_INDICATOR(evnt->ind)->fd); } int evnt_cb_func_connect(struct Evnt *EVNT__ATTR_UNUSED(evnt)) { return (TRUE); } struct Evnt *evnt_cb_func_accept(struct Evnt *EVNT__ATTR_UNUSED(evnt), int EVNT__ATTR_UNUSED(fd), struct sockaddr *EVNT__ATTR_UNUSED(sa), socklen_t EVNT__ATTR_UNUSED(len)) { return (NULL); } int evnt_cb_func_recv(struct Evnt *evnt) { unsigned int ern = 0; int ret = evnt_recv(evnt, &ern); if (ret) return (TRUE); if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len) return (evnt_shutdown_r(evnt, TRUE)); return (FALSE); } int evnt_cb_func_send(struct Evnt *evnt) { int ret = -1; evnt_fd_set_cork(evnt, TRUE); ret = evnt_send(evnt); if (!evnt->io_w->len) evnt_fd_set_cork(evnt, FALSE); return (ret); } void evnt_cb_func_free(struct Evnt *evnt) { MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt)); free(evnt); } void evnt_cb_func_F(struct Evnt *evnt) { F(evnt); } int evnt_cb_func_shutdown_r(struct Evnt *evnt) { vlg_dbg2(vlg, "SHUTDOWN CB from[$]\n", EVNT_SA(evnt)); if (!evnt_shutdown_r(evnt, FALSE)) return (FALSE); /* called from outside read, and read'll never get called again ... * so quit if we have nothing to send */ return (!!evnt->io_w->len); } static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref) { ASSERT(ref); evnt->flag_q_accept = FALSE; evnt->flag_q_connect = FALSE; evnt->flag_q_recv = FALSE; evnt->flag_q_send_recv = FALSE; evnt->flag_q_none = FALSE; evnt->flag_q_send_now = FALSE; evnt->flag_q_closed = FALSE; evnt->flag_q_pkt_move = FALSE; /* FIXME: need group settings, default no nagle but cork */ evnt->flag_io_nagle = evnt_opt_nagle; evnt->flag_io_cork = FALSE; evnt->flag_io_filter = FALSE; evnt->flag_fully_acpt = FALSE; evnt->io_r_shutdown = FALSE; evnt->io_w_shutdown = FALSE; evnt->prev_bytes_r = 0; evnt->acct.req_put = 0; evnt->acct.req_got = 0; evnt->acct.bytes_r = 0; evnt->acct.bytes_w = 0; evnt->cbs->cb_func_accept = evnt_cb_func_accept; evnt->cbs->cb_func_connect = evnt_cb_func_connect; evnt->cbs->cb_func_recv = evnt_cb_func_recv; evnt->cbs->cb_func_send = evnt_cb_func_send; evnt->cbs->cb_func_free = evnt_cb_func_F; evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r; if (!(evnt->io_r = vstr_make_base(NULL))) goto make_vstr_fail; if (!(evnt->io_w = vstr_make_base(NULL))) goto make_vstr_fail; evnt->tm_o = NULL; if (!(evnt->ind = evnt_poll_add(evnt, fd))) goto poll_add_fail; EVNT__UPDATE_TV(); EVNT__COPY_TV(&evnt->ctime); EVNT__COPY_TV(&evnt->mtime); evnt->msecs_tm_mtime = 0; if (fcntl(fd, F_SETFD, TRUE) == -1) goto fcntl_fail; evnt->sa_ref = vstr_ref_add(ref); evnt_fd__set_nonblock(fd, TRUE); return (TRUE); fcntl_fail: evnt_poll_del(evnt); poll_add_fail: vstr_free_base(evnt->io_w); make_vstr_fail: vstr_free_base(evnt->io_r); errno = ENOMEM; return (FALSE); } static void evnt__free1(struct Evnt *evnt) { evnt_send_del(evnt); if (evnt->io_r && evnt->io_r->len) vlg_dbg2(vlg, "evnt__free1(%p) io_r len = %zu\n", evnt, evnt->io_r->len); if (evnt->io_w && evnt->io_w->len) vlg_dbg2(vlg, "evnt__free1(%p) io_w len = %zu\n", evnt, evnt->io_w->len); vstr_free_base(evnt->io_w); evnt->io_w = NULL; vstr_free_base(evnt->io_r); evnt->io_r = NULL; evnt_poll_del(evnt); } static void evnt__free2(Vstr_ref *sa, Timer_q_node *tm_o) { /* post callbacks, evnt no longer exists */ vstr_ref_del(sa); if (tm_o) { timer_q_cntl_node(tm_o, TIMER_Q_CNTL_NODE_SET_DATA, NULL); timer_q_quick_del_node(tm_o); } } static void evnt__free(struct Evnt *evnt) { if (evnt) { Vstr_ref *sa = evnt->sa_ref; Timer_q_node *tm_o = evnt->tm_o; evnt__free1(evnt); ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */ --evnt__num; ASSERT(evnt__num == evnt__debug_num_all()); evnt->cbs->cb_func_free(evnt); evnt__free2(sa, tm_o); } } void evnt_free(struct Evnt *evnt) { if (evnt) { evnt__del_whatever(evnt); evnt__free(evnt); } } static void evnt__uninit(struct Evnt *evnt) { ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv + evnt->flag_q_send_recv + evnt->flag_q_none) == 0); evnt__free1(evnt); evnt__free2(evnt->sa_ref, evnt->tm_o); } static int evnt_fd__set_nodelay(int fd, int val) { return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1); } static int evnt_fd__set_reuse(int fd, int val) { return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1); } static void evnt__fd_close_noerrno(int fd) { int saved_errno = errno; close(fd); errno = saved_errno; } static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags) { evnt_add(que, evnt); ++evnt__num; evnt_wait_cntl_add(evnt, flags); ASSERT(evnt__valid(evnt)); return (TRUE); } int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port) { int fd = -1; socklen_t alloc_len = sizeof(struct sockaddr_in); Vstr_ref *ref = NULL; if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) goto sock_fail; if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; if (!evnt_init(evnt, fd, ref)) goto init_fail; evnt->flag_q_pkt_move = TRUE; if (!evnt->flag_io_nagle) evnt_fd__set_nodelay(fd, TRUE); EVNT_SA_IN4(evnt)->sin_family = AF_INET; EVNT_SA_IN4(evnt)->sin_port = htons(port); EVNT_SA_IN4(evnt)->sin_addr.s_addr = inet_addr(ipv4_string); ASSERT(port && (EVNT_SA_IN4(evnt)->sin_addr.s_addr != htonl(INADDR_ANY))); if (connect(fd, EVNT_SA(evnt), alloc_len) == -1) { if (errno == EINPROGRESS) { /* The connection needs more time....*/ vstr_ref_del(ref); evnt->flag_q_connect = TRUE; return (evnt__make_end(&q_connect, evnt, POLLOUT)); } goto connect_fail; } vstr_ref_del(ref); evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); connect_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_con_local(struct Evnt *evnt, const char *fname) { int fd = -1; size_t len = strlen(fname) + 1; struct sockaddr_un tmp_sun; socklen_t alloc_len = 0; Vstr_ref *ref = NULL; tmp_sun.sun_path[0] = 0; alloc_len = SUN_LEN(&tmp_sun) + len; if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1) goto sock_fail; if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; if (!evnt_init(evnt, fd, ref)) goto init_fail; evnt->flag_q_pkt_move = TRUE; EVNT_SA_UN(evnt)->sun_family = AF_LOCAL; memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len); if (connect(fd, EVNT_SA(evnt), alloc_len) == -1) { if (errno == EINPROGRESS) { /* The connection needs more time....*/ vstr_ref_del(ref); evnt->flag_q_connect = TRUE; return (evnt__make_end(&q_connect, evnt, POLLOUT)); } goto connect_fail; } vstr_ref_del(ref); evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); connect_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa) { if (!evnt_init(evnt, fd, sa)) return (FALSE); if (!evnt->flag_io_nagle) evnt_fd__set_nodelay(fd, TRUE); evnt->flag_q_recv = TRUE; return (evnt__make_end(&q_recv, evnt, POLLIN)); } int evnt_make_acpt_dup(struct Evnt *evnt, int fd, struct sockaddr *sa, socklen_t len) { Vstr_ref *ref = vstr_ref_make_memdup(sa, len); int ret = FALSE; if (!ref) { errno = ENOMEM; return (FALSE); } ret = evnt_make_acpt_ref(evnt, fd, ref); vstr_ref_del(ref); return (ret); } static int evnt__make_bind_end(struct Evnt *evnt) { vstr_free_base(evnt->io_r); evnt->io_r = NULL; vstr_free_base(evnt->io_w); evnt->io_w = NULL; evnt->flag_q_accept = TRUE; evnt__make_end(&q_accept, evnt, POLLIN); SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0; return (TRUE); } int evnt_make_bind_ipv4(struct Evnt *evnt, const char *acpt_addr, short server_port, unsigned int listen_len) { int fd = -1; int saved_errno = 0; socklen_t alloc_len = sizeof(struct sockaddr_in); Vstr_ref *ref = NULL; if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) goto sock_fail; if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; if (!evnt_init(evnt, fd, ref)) goto init_fail; EVNT_SA_IN4(evnt)->sin_family = AF_INET; EVNT_SA_IN4(evnt)->sin_addr.s_addr = htonl(INADDR_ANY); if (acpt_addr && *acpt_addr) /* silent error becomes */ EVNT__RESOLVE_NAME(evnt, acpt_addr); if (EVNT_SA_IN4(evnt)->sin_addr.s_addr == htonl(INADDR_ANY)) acpt_addr = "any"; EVNT_SA_IN4(evnt)->sin_port = htons(server_port); if (!evnt_fd__set_reuse(fd, TRUE)) goto reuse_fail; if (bind(fd, EVNT_SA(evnt), alloc_len) == -1) goto bind_fail; if (!server_port) if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1) vlg_err(vlg, EXIT_FAILURE, "getsockname: %m\n"); if (listen(fd, listen_len) == -1) goto listen_fail; vstr_ref_del(ref); return (evnt__make_bind_end(evnt)); bind_fail: saved_errno = errno; vlg_warn(vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port); errno = saved_errno; listen_fail: reuse_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_bind_local(struct Evnt *evnt, const char *fname, unsigned int listen_len) { int fd = -1; int saved_errno = 0; size_t len = strlen(fname) + 1; struct sockaddr_un tmp_sun; socklen_t alloc_len = 0; Vstr_ref *ref = NULL; tmp_sun.sun_path[0] = 0; alloc_len = SUN_LEN(&tmp_sun) + len; if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1) goto sock_fail; if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; if (!evnt_init(evnt, fd, ref)) goto init_fail; EVNT_SA_UN(evnt)->sun_family = AF_LOCAL; memcpy(EVNT_SA_UN(evnt)->sun_path, fname, len); unlink(fname); if (bind(fd, EVNT_SA(evnt), alloc_len) == -1) goto bind_fail; if (fchmod(fd, 0600) == -1) goto fchmod_fail; if (listen(fd, listen_len) == -1) goto listen_fail; vstr_ref_del(ref); return (evnt__make_bind_end(evnt)); bind_fail: saved_errno = errno; vlg_warn(vlg, "bind(%s): %m\n", fname); errno = saved_errno; fchmod_fail: listen_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags) { static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1}; if (!sa) sa = &dummy_sa; if (!evnt_init(evnt, fd, sa)) { evnt__fd_close_noerrno(fd); return (FALSE); } if (flags & POLLIN) { evnt->flag_q_recv = TRUE; return (evnt__make_end(&q_recv, evnt, POLLIN)); } evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); } void evnt_close(struct Evnt *evnt) { if (!evnt) return; ASSERT(evnt__valid(evnt)); if (evnt->flag_q_closed) return; /* can't close at this point or we'll race with: * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */ /* queue for deletion ... as the bt might still be using the ptr */ evnt->flag_q_closed = TRUE; evnt->c_next = q_closed; q_closed = evnt; ASSERT(evnt__valid(evnt)); } void evnt_put_pkt(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); if (evnt->flag_q_pkt_move && evnt->flag_q_none) { ASSERT(evnt->acct.req_put >= evnt->acct.req_got); evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_wait_cntl_add(evnt, POLLIN); } ++evnt->acct.req_put; ASSERT(evnt__valid(evnt)); } void evnt_got_pkt(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); ++evnt->acct.req_got; if (evnt->flag_q_pkt_move && !evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got)) { ASSERT(evnt->acct.req_put >= evnt->acct.req_got); evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE; evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE; evnt_wait_cntl_del(evnt, POLLIN); } ASSERT(evnt__valid(evnt)); } static int evnt__call_send(struct Evnt *evnt, unsigned int *ern) { size_t tmp = evnt->io_w->len; int fd = evnt_fd(evnt); if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN)) return (FALSE); tmp -= evnt->io_w->len; vlg_dbg3(vlg, "write(%p) = %zu\n", evnt, tmp); evnt->acct.bytes_w += tmp; return (TRUE); } int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz) { ASSERT(evnt__valid(evnt)); vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n", evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q); if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz)) { if (!evnt__call_send(evnt, NULL)) { ASSERT(evnt__valid(evnt)); return (FALSE); } if (!evnt->io_w->len && !force_q) { ASSERT(evnt__valid(evnt)); return (TRUE); } } /* already on send_q -- or already polling (and not forcing) */ if (evnt->flag_q_send_now || (evnt->flag_q_send_recv && !force_q)) { ASSERT(evnt__valid(evnt)); return (TRUE); } evnt->s_next = q_send_now; q_send_now = evnt; evnt->flag_q_send_now = TRUE; ASSERT(evnt__valid(evnt)); return (TRUE); } /* if a connection is on the send now q, then remove them ... this is only * done when the client gets killed, so it doesn't matter if it's slow */ void evnt_send_del(struct Evnt *evnt) { struct Evnt **scan = &q_send_now; if (!evnt->flag_q_send_now) return; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(*scan); *scan = evnt->s_next; evnt->flag_q_send_now = FALSE; } int evnt_shutdown_r(struct Evnt *evnt, int got_eof) { ASSERT(evnt__valid(evnt)); if (evnt->io_r_shutdown || evnt->io_w_shutdown) return (FALSE); evnt_wait_cntl_del(evnt, POLLIN); vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$]\n", got_eof, EVNT_SA(evnt)); if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1)) { if (errno != ENOTCONN) vlg_warn(vlg, "shutdown(SHUT_RD): %m\n"); return (FALSE); } evnt->io_r_shutdown = TRUE; ASSERT(evnt__valid(evnt)); return (TRUE); } static void evnt__send_fin(struct Evnt *evnt) { if (0) { /* nothing */ } else if ( evnt->flag_q_send_recv && !evnt->io_w->len) { evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE; if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got)) { evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE; evnt_wait_cntl_del(evnt, POLLIN | POLLOUT); } else { evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_wait_cntl_del(evnt, POLLOUT); } } else if (!evnt->flag_q_send_recv && evnt->io_w->len) { int pflags = POLLOUT; ASSERT(evnt->flag_q_none || evnt->flag_q_recv); if (evnt->flag_q_none) evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE; else evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE; evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE; if (!evnt->io_r_shutdown) pflags |= POLLIN; evnt_wait_cntl_add(evnt, pflags); } } int evnt_shutdown_w(struct Evnt *evnt) { Vstr_base *out = evnt->io_w; ASSERT(evnt__valid(evnt)); vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$]\n", EVNT_SA(evnt)); /* evnt_fd_set_cork(evnt, FALSE); eats data in 2.4.22-1.2199.4.legacy.npt */ if (evnt->io_r_shutdown || evnt->io_w_shutdown) return (FALSE); if (shutdown(evnt_fd(evnt), SHUT_WR) == -1) { if (errno != ENOTCONN) vlg_warn(vlg, "shutdown(SHUT_WR): %m\n"); return (FALSE); } evnt->io_w_shutdown = TRUE; vstr_del(out, 1, out->len); evnt__send_fin(evnt); ASSERT(evnt__valid(evnt)); return (TRUE); } int evnt_recv(struct Evnt *evnt, unsigned int *ern) { Vstr_base *data = evnt->io_r; size_t tmp = evnt->io_r->len; unsigned int num_min = 2; unsigned int num_max = 6; /* ave. browser reqs are 500ish, ab is much less */ ASSERT(evnt__valid(evnt) && ern); /* FIXME: this is set for HTTPD's default buf sizes of 120 (128 - 8) */ if (evnt->prev_bytes_r > (120 * 3)) num_max = 8; if (evnt->prev_bytes_r > (120 * 6)) num_max = 32; vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern); evnt->prev_bytes_r = (evnt->io_r->len - tmp); vlg_dbg3(vlg, "read(%p) = %ju\n", evnt, evnt->prev_bytes_r); evnt->acct.bytes_r += evnt->prev_bytes_r; switch (*ern) { case VSTR_TYPE_SC_READ_FD_ERR_NONE: if (evnt->io_w_shutdown) /* doesn't count if we can't respond */ vstr_del(data, 1, data->len); else EVNT__COPY_TV(&evnt->mtime); return (TRUE); case VSTR_TYPE_SC_READ_FD_ERR_MEM: vlg_warn(vlg, "%s\n", __func__); break; case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO: if (errno == EAGAIN) return (TRUE); break; case VSTR_TYPE_SC_READ_FD_ERR_EOF: break; default: /* unknown */ vlg_warn(vlg, "read_iov() = %d: %m\n", *ern); } return (FALSE); } int evnt_send(struct Evnt *evnt) { unsigned int ern = 0; ASSERT(evnt__valid(evnt)); if (!evnt__call_send(evnt, &ern)) return (FALSE); EVNT__COPY_TV(&evnt->mtime); evnt__send_fin(evnt); ASSERT(evnt__valid(evnt)); return (TRUE); } #ifdef VSTR_AUTOCONF_lseek64 /* lseek64 doesn't exist */ # define sendfile64 sendfile #endif int evnt_sendfile(struct Evnt *evnt, int ffd, VSTR_AUTOCONF_uintmax_t *f_off, VSTR_AUTOCONF_uintmax_t *f_len, unsigned int *ern) { ssize_t ret = 0; off64_t tmp_off = *f_off; size_t tmp_len = *f_len; *ern = 0; ASSERT(evnt__valid(evnt)); ASSERT(!evnt->io_w->len); if (*f_len > SSIZE_MAX) tmp_len = SSIZE_MAX; if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1) { if (errno == EAGAIN) return (TRUE); *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO; return (FALSE); } if (!ret) { *ern = VSTR_TYPE_SC_READ_FD_ERR_EOF; return (FALSE); } *f_off = tmp_off; evnt->acct.bytes_w += ret; EVNT__COPY_TV(&evnt->mtime); *f_len -= ret; return (TRUE); } int evnt_sc_read_send(struct Evnt *evnt, int fd, VSTR_AUTOCONF_uintmax_t *len) { Vstr_base *out = evnt->io_w; size_t orig_len = out->len; size_t tmp = 0; int ret = IO_OK; ASSERT(len && *len); if ((ret = io_get(out, fd)) == IO_FAIL) return (EVNT_IO_READ_ERR); if (ret == IO_EOF) return (EVNT_IO_READ_EOF); tmp = out->len - orig_len; if (tmp >= *len) { /* we might not be transfering to EOF, so reduce if needed */ vstr_sc_reduce(out, 1, out->len, tmp - *len); ASSERT((out->len - orig_len) == *len); *len = 0; return (EVNT_IO_READ_FIN); } *len -= tmp; if (!evnt_send(evnt)) return (EVNT_IO_SEND_ERR); return (EVNT_IO_OK); } static int evnt__get_timeout(void) { const struct timeval *tv = NULL; int msecs = -1; if (q_send_now) msecs = 0; else if ((tv = timer_q_first_timeval())) { long diff = 0; struct timeval now_timeval; EVNT__COPY_TV(&now_timeval); diff = timer_q_timeval_diff_msecs(tv, &now_timeval); if (diff > 0) { if (diff >= INT_MAX) msecs = INT_MAX - 1; else msecs = diff; } else msecs = 0; } vlg_dbg2(vlg, "get_timeout = %d\n", msecs); return (msecs); } void evnt_scan_q_close(void) { struct Evnt *scan = NULL; while ((scan = q_closed)) { scan->flag_q_closed = FALSE; q_closed = scan->c_next; evnt_free(scan); } ASSERT(!q_closed); } static void evnt__close_now(struct Evnt *evnt) { if (evnt->flag_q_closed) return; evnt_free(evnt); } /* if something goes wrong drop all accept'ing events */ void evnt_acpt_close_all(void) { struct Evnt *evnt = q_accept; /* struct Evnt *evnt = evnt_queue("accept"); */ while (evnt) { evnt_close(evnt); evnt = evnt->next; } } void evnt_scan_fds(unsigned int ready, size_t max_sz) { const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL); struct Evnt *scan = NULL; EVNT__UPDATE_TV(); scan = q_connect; while (scan && ready) { struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (scan->flag_q_closed) goto next_connect; assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN)); /* done as one so we get error code */ if (SOCKET_POLL_INDICATOR(scan->ind)->revents & (POLLOUT|bad_poll_flags)) { int ern = 0; socklen_t len = sizeof(int); int ret = 0; done = TRUE; ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len); if (ret == -1) vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n"); else if (ern) { errno = ern; vlg_warn(vlg, "connect(): %m\n"); evnt__close_now(scan); } else { SOCKET_POLL_INDICATOR(scan->ind)->events = 0; SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE; evnt_add(&q_none, scan); scan->flag_q_none = TRUE; evnt_wait_cntl_del(scan, POLLOUT); if (!scan->cbs->cb_func_connect(scan)) evnt__close_now(scan); } goto next_connect; } ASSERT(!done); if (evnt_poll_direct_enabled()) break; next_connect: SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (done) --ready; scan = scan_next; } scan = q_accept; while (scan && ready) { /* Papers have suggested that preferring read over accept is better * -- edge triggering needs to requeue on non failure */ struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (scan->flag_q_closed) goto next_accept; assert(!(SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT)); if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags)) { /* done first as it's an error with the accept fd, whereas accept * generates new fds */ done = TRUE; evnt__close_now(scan); goto next_accept; } if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) { struct sockaddr_in sa; socklen_t len = sizeof(struct sockaddr_in); int fd = -1; struct Evnt *tmp = NULL; unsigned int acpt_num = 0; done = TRUE; /* ignore all accept() errors -- bad_poll_flags fixes here */ /* FIXME: apache seems to assume certain errors are really bad and we * should just kill the listen socket and wait to die. But for instance. * we can't just kill the socket on EMFILE, as we might have hit our * resource limit */ while ((SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) && (fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len)) != -1) { if (!(tmp = scan->cbs->cb_func_accept(scan, fd, (struct sockaddr *) &sa, len))) { close(fd); goto next_accept; } if (!tmp->flag_q_closed) { ++ready; /* give a read event to this new event */ tmp->flag_fully_acpt = TRUE; } assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN); assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN); assert(tmp == q_recv); if (++acpt_num >= evnt__accept_limit) break; } goto next_accept; } ASSERT(!done); if (evnt_poll_direct_enabled()) break; next_accept: SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (done) --ready; scan = scan_next; } scan = q_recv; while (scan && ready) { struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (scan->flag_q_closed) goto next_recv; if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) { done = TRUE; if (!scan->cbs->cb_func_recv(scan)) evnt__close_now(scan); } if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags)) { done = TRUE; if (scan->io_r_shutdown || scan->io_w_shutdown || !scan->cbs->cb_func_shutdown_r(scan)) evnt__close_now(scan); } if (!done && evnt_poll_direct_enabled()) break; next_recv: if (done) --ready; scan = scan_next; } scan = q_send_recv; while (scan && ready) { struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (scan->flag_q_closed) goto next_send; if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLIN) { done = TRUE; if (!scan->cbs->cb_func_recv(scan)) { evnt__close_now(scan); goto next_send; } } if (SOCKET_POLL_INDICATOR(scan->ind)->revents & POLLOUT) { done = TRUE; /* need groups so we can do direct send here */ if (!evnt_send_add(scan, TRUE, max_sz)) evnt__close_now(scan); } if (!done && (SOCKET_POLL_INDICATOR(scan->ind)->revents & bad_poll_flags)) { done = TRUE; if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan)) evnt__close_now(scan); } if (!done && evnt_poll_direct_enabled()) break; next_send: if (done) --ready; scan = scan_next; } scan = q_none; while (scan && ready) { struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (scan->flag_q_closed) goto next_none; if (SOCKET_POLL_INDICATOR(scan->ind)->revents) { /* POLLIN == EOF ? */ /* FIXME: failure cb */ done = TRUE; evnt__close_now(scan); goto next_none; } else assert(!SOCKET_POLL_INDICATOR(scan->ind)->revents); ASSERT(!done); if (evnt_poll_direct_enabled()) break; next_none: if (done) --ready; scan = scan_next; } if (q_closed) evnt_scan_q_close(); else if (ready) vlg_abort(vlg, "ready = %d\n", ready); } void evnt_scan_send_fds(void) { struct Evnt **scan = NULL; evnt_scan_q_close(); scan = &q_send_now; while (*scan) { struct Evnt *tmp = *scan; tmp->flag_q_send_now = FALSE; *scan = tmp->s_next; if (!tmp->cbs->cb_func_send(tmp)) { evnt__close_now(tmp); continue; } if (tmp == *scan) /* added back to q */ { ASSERT(tmp->flag_q_send_now == TRUE); scan = &tmp->s_next; } } evnt_scan_q_close(); } static void evnt__close_1(struct Evnt **root) { struct Evnt *scan = *root; *root = NULL; while (scan) { struct Evnt *scan_next = scan->next; Vstr_ref *sa = scan->sa_ref; Timer_q_node *tm_o = scan->tm_o; vstr_free_base(scan->io_w); scan->io_w = NULL; vstr_free_base(scan->io_r); scan->io_r = NULL; evnt_poll_del(scan); --evnt__num; evnt__free2(sa, tm_o); scan->cbs->cb_func_free(scan); scan = scan_next; } } void evnt_close_all(void) { q_send_now = NULL; q_closed = NULL; evnt__close_1(&q_connect); evnt__close_1(&q_accept); evnt__close_1(&q_recv); evnt__close_1(&q_send_recv); evnt__close_1(&q_none); ASSERT(evnt__num == evnt__debug_num_all()); } void evnt_out_dbg3(const char *prefix) { if (vlg->out_dbg < 3) return; vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n", prefix, evnt_num_all(), evnt__debug_num_1(q_connect), evnt__debug_num_1(q_accept), evnt__debug_num_1(q_recv), evnt__debug_num_1(q_send_recv), evnt__debug_num_1(q_none), evnt__debug_num_1(q_send_now)); } void evnt_stats_add(struct Evnt *dst, const struct Evnt *src) { dst->acct.req_put += src->acct.req_put; dst->acct.req_put += src->acct.req_got; dst->acct.bytes_r += src->acct.bytes_r; dst->acct.bytes_w += src->acct.bytes_w; } unsigned int evnt_num_all(void) { ASSERT(evnt__num == evnt__debug_num_all()); return (evnt__num); } int evnt_waiting(void) { return (q_connect || q_accept || q_recv || q_send_recv || q_send_now); } struct Evnt *evnt_find_least_used(void) { struct Evnt *con = NULL; struct Evnt *con_min = NULL; /* Find a usable connection, tries to find the least used connection * preferring ones not blocking on send IO */ if (!(con = q_none) && !(con = q_recv) && !(con = q_send_recv)) return (NULL); /* FIXME: not optimal, only want to change after a certain level */ con_min = con; while (con) { if (con_min->io_w->len > con->io_w->len) con_min = con; con = con->next; } return (con_min); } #define MATCH_Q_NAME(x) \ if (CSTREQ(qname, #x )) \ return ( q_ ## x ) \ struct Evnt *evnt_queue(const char *qname) { MATCH_Q_NAME(connect); MATCH_Q_NAME(accept); MATCH_Q_NAME(send_recv); MATCH_Q_NAME(recv); MATCH_Q_NAME(none); MATCH_Q_NAME(send_now); return (NULL); } #ifdef VSTR_AUTOCONF_HAVE_TCP_CORK # define USE_TCP_CORK 1 #else # define USE_TCP_CORK 0 # define TCP_CORK 0 #endif void evnt_fd_set_cork(struct Evnt *evnt, int val) { /* assume it can't work for set and fail for unset */ ASSERT(evnt__valid(evnt)); if (!USE_TCP_CORK) return; if (!evnt->flag_io_cork == !val) return; if (!evnt->flag_io_nagle) /* flags can't be combined ... stupid */ { evnt_fd__set_nodelay(evnt_fd(evnt), FALSE); evnt->flag_io_nagle = TRUE; } if (setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) == -1) return; evnt->flag_io_cork = !!val; } static void evnt__free_base_noerrno(Vstr_base *s1) { int saved_errno = errno; vstr_free_base(s1); errno = saved_errno; } int evnt_fd_set_filter(struct Evnt *evnt, const char *fname) { int fd = evnt_fd(evnt); Vstr_base *s1 = NULL; unsigned int ern = 0; if (!CONF_USE_SOCKET_FILTERS) return (TRUE); if (!(s1 = vstr_make_base(NULL))) VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n")); vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern); if (ern && ((ern != VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) || (errno != ENOENT))) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname)); } else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX) { vstr_free_base(s1); errno = E2BIG; VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname)); } else if (!s1->len) { vstr_free_base(s1); if (!evnt->flag_io_filter) vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname); else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n", fname)); } evnt->flag_io_filter = FALSE; return (TRUE); } else { struct sock_fprog filter[1]; socklen_t len = sizeof(filter); filter->len = s1->len / sizeof(struct sock_filter); filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len); if (!filter->filter) VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n")); if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n", fname)); } } evnt->flag_io_filter = TRUE; vstr_free_base(s1); return (TRUE); } static Timer_q_base *evnt__timeout_1 = NULL; static Timer_q_base *evnt__timeout_10 = NULL; static Timer_q_base *evnt__timeout_100 = NULL; static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt, struct timeval *tv, unsigned long msecs) { Timer_q_node *tm_o = NULL; vlg_dbg2(vlg, "mtime_make(%p, %lu)\n", evnt, msecs); if (0) { } else if (msecs >= ( 99 * 1000)) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0); tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else if (msecs >= ( 9 * 1000)) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 10, 0); tm_o = timer_q_add_node(evnt__timeout_10, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else { TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0); tm_o = timer_q_add_node(evnt__timeout_1, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } return (tm_o); } static void evnt__timer_cb_mtime(int type, void *data) { struct Evnt *evnt = data; struct timeval tv[1]; unsigned long diff = 0; if (!evnt) /* deleted */ return; ASSERT(evnt__valid(evnt)); if (type == TIMER_Q_TYPE_CALL_RUN_ALL) return; evnt->tm_o = NULL; if (type == TIMER_Q_TYPE_CALL_DEL) return; EVNT__COPY_TV(tv); /* find out time elapsed */ diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime); if (diff < evnt->msecs_tm_mtime) diff = evnt->msecs_tm_mtime - diff; /* seconds left until timeout */ else { vlg_dbg2(vlg, "timeout = %p[$] (%lu, %lu)\n", evnt, EVNT_SA(evnt), diff, evnt->msecs_tm_mtime); if (!evnt_shutdown_w(evnt)) { evnt_close(evnt); return; } /* FIXME: linger close time configurable? */ EVNT__COPY_TV(&evnt->mtime); diff = evnt->msecs_tm_mtime; } if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff))) { errno = ENOMEM; vlg_warn(vlg, "%s: %m\n", "timer reinit"); evnt_close(evnt); } } void evnt_timeout_init(void) { ASSERT(!evnt__timeout_1); EVNT__UPDATE_TV(); /* move when empty is buggy, *sigh* */ evnt__timeout_1 = timer_q_add_base(evnt__timer_cb_mtime, TIMER_Q_FLAG_BASE_INSERT_FROM_END); evnt__timeout_10 = timer_q_add_base(evnt__timer_cb_mtime, TIMER_Q_FLAG_BASE_INSERT_FROM_END); evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime, TIMER_Q_FLAG_BASE_INSERT_FROM_END); if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init")); /* FIXME: massive hack 1.0.5 is broken */ timer_q_cntl_base(evnt__timeout_1, TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE); timer_q_cntl_base(evnt__timeout_10, TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE); timer_q_cntl_base(evnt__timeout_100, TIMER_Q_CNTL_BASE_SET_FLAG_INSERT_FROM_END, FALSE); } void evnt_timeout_exit(void) { ASSERT(evnt__timeout_1); timer_q_del_base(evnt__timeout_1); evnt__timeout_1 = NULL; timer_q_del_base(evnt__timeout_10); evnt__timeout_10 = NULL; timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL; } int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs) { struct timeval tv[1]; evnt->msecs_tm_mtime = msecs; EVNT__COPY_TV(tv); if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs))) return (FALSE); return (TRUE); } void evnt_sc_main_loop(size_t max_sz) { int ready = 0; struct timeval tv[1]; ready = evnt_poll(); if ((ready == -1) && (errno != EINTR)) vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll"); if (ready == -1) return; evnt_out_dbg3("1"); evnt_scan_fds(ready, max_sz); evnt_out_dbg3("2"); evnt_scan_send_fds(); evnt_out_dbg3("3"); EVNT__COPY_TV(tv); timer_q_run_norm(tv); evnt_out_dbg3("4"); evnt_scan_send_fds(); evnt_out_dbg3("5"); } time_t evnt_sc_time(void) { time_t ret = -1; if (!CONF_GETTIMEOFDAY_TIME) ret = time(NULL); else { struct timeval tv[1]; EVNT__COPY_TV(tv); ret = tv->tv_sec; } return (ret); } void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt) { struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt; struct Acpt_data *acpt_data = acpt_listener->ref->ptr; evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE"); acpt_data->evnt = NULL; vstr_ref_del(acpt_listener->ref); F(acpt_listener); } static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref) { struct Acpt_data *ptr = NULL; if (!ref) return; ptr = ref->ptr; vstr_ref_del(ptr->sa); F(ptr); free(ref); } struct Evnt *evnt_sc_serv_make_bind(const char *acpt_addr, unsigned short acpt_port, unsigned int q_listen_len, unsigned int max_connections, unsigned int defer_accept, const char *acpt_filter_file) { struct sockaddr_in *sinv4 = NULL; Acpt_listener *acpt_listener = NULL; Acpt_data *acpt_data = NULL; Vstr_ref *ref = NULL; if (!(acpt_listener = MK(sizeof(Acpt_listener)))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_listener->max_connections = max_connections; if (!(acpt_data = MK(sizeof(Acpt_data)))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_data->evnt = NULL; acpt_data->sa = NULL; if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_listener->ref = ref; if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port, q_listen_len)) vlg_err(vlg, 2, "%s: %m\n", __func__); acpt_data->evnt = acpt_listener->evnt; acpt_data->sa = vstr_ref_add(acpt_data->evnt->sa_ref); sinv4 = EVNT_SA_IN4(acpt_listener->evnt); ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port))); if (defer_accept) evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept); if (acpt_filter_file && !evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file)) vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file); acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free; return (acpt_listener->evnt); } void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix) { vlg_info(vlg, "%s from[$] req_got[%'u:%u] req_put[%'u:%u]" " recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n", prefix, EVNT_SA(evnt), evnt->acct.req_got, evnt->acct.req_got, evnt->acct.req_put, evnt->acct.req_put, evnt->acct.bytes_r, evnt->acct.bytes_r, evnt->acct.bytes_w, evnt->acct.bytes_w); } #ifdef TCP_DEFER_ACCEPT # define USE_TCP_DEFER_ACCEPT 1 #else # define USE_TCP_DEFER_ACCEPT 0 # define TCP_DEFER_ACCEPT 0 #endif void evnt_fd_set_defer_accept(struct Evnt *evnt, int val) { socklen_t len = sizeof(int); ASSERT(evnt__valid(evnt)); if (!USE_TCP_DEFER_ACCEPT) return; /* ignore return val */ setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len); } pid_t evnt_make_child(void) { pid_t ret = fork(); if (!ret) { evnt__is_child = TRUE; evnt_poll_child_init(); } return (ret); } int evnt_is_child(void) { return (evnt__is_child); } int evnt_child_block_beg(void) { if (!evnt_is_child()) return (TRUE); if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1) vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n"); if (evnt_child_exited) return (FALSE); return (TRUE); } int evnt_child_block_end(void) { if (!evnt_is_child()) return (TRUE); if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1) vlg_err(vlg, EXIT_FAILURE, "prctl(): %m\n"); return (TRUE); } /* if we are blocking forever, and the only thing we are waiting for is * a single accept() fd, just call accept() */ static int evnt__poll_tst_accept(int msecs) { return ((msecs == -1) && (evnt_num_all() == 1) && q_accept); } static int evnt__poll_accept(void) { int fd = -1; struct Evnt *scan = q_accept; struct sockaddr_in sa; socklen_t len = sizeof(struct sockaddr_in); struct Evnt *tmp = NULL; /* need to make sure we die if the parent does */ evnt_fd__set_nonblock(evnt_fd(scan), FALSE); if (!evnt_child_block_beg()) goto block_beg_fail; fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len); evnt_child_block_end(); evnt_fd__set_nonblock(evnt_fd(scan), TRUE); if (fd == -1) goto accept_fail; if (!(tmp = scan->cbs->cb_func_accept(scan, fd, (struct sockaddr *) &sa, len))) goto cb_accept_fail; if (!tmp->flag_q_closed) tmp->flag_fully_acpt = TRUE; assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN); assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN); assert(tmp == q_recv); return (1); block_beg_fail: evnt_fd__set_nonblock(evnt_fd(scan), TRUE); goto accept_fail; cb_accept_fail: close(fd); accept_fail: errno = EINTR; return (-1); } #ifndef EVNT_USE_EPOLL # ifdef VSTR_AUTOCONF_HAVE_SYS_EPOLL_H # define EVNT_USE_EPOLL 1 # else # define EVNT_USE_EPOLL 0 # endif #endif #if !EVNT_USE_EPOLL int evnt_poll_init(void) { return (TRUE); } int evnt_poll_direct_enabled(void) { return (FALSE); } int evnt_poll_child_init(void) { return (TRUE); } void evnt_wait_cntl_add(struct Evnt *evnt, int flags) { SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= flags; } void evnt_wait_cntl_del(struct Evnt *evnt, int flags) { SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags; } unsigned int evnt_poll_add(struct Evnt *EVNT__ATTR_UNUSED(evnt), int fd) { return (socket_poll_add(fd)); } void evnt_poll_del(struct Evnt *evnt) { if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1) close(SOCKET_POLL_INDICATOR(evnt->ind)->fd); socket_poll_del(evnt->ind); } /* NOTE: that because of socket_poll direct mapping etc. we can't be "clever" */ int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd) { unsigned int old_ind = evnt->ind; assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd); ASSERT(evnt__valid(evnt)); if (!(evnt->ind = socket_poll_add(fd))) goto poll_add_fail; if (!(evnt->io_r = vstr_make_base(NULL)) || !(evnt->io_w = vstr_make_base(NULL))) goto malloc_base_fail; SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN; socket_poll_del(old_ind); evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_fd__set_nonblock(fd, TRUE); ASSERT(evnt__valid(evnt)); return (TRUE); malloc_base_fail: socket_poll_del(evnt->ind); evnt->ind = old_ind; vstr_free_base(evnt->io_r); evnt->io_r = NULL; ASSERT(!evnt->io_r && !evnt->io_w); poll_add_fail: return (FALSE); } int evnt_poll(void) { int msecs = evnt__get_timeout(); if (evnt_child_exited) return (errno = EINTR, -1); if (evnt__poll_tst_accept(msecs)) return (evnt__poll_accept()); return (socket_poll_update_all(msecs)); } #else #include static int evnt__epoll_fd = -1; int evnt_poll_init(void) { assert(POLLIN == EPOLLIN); assert(POLLOUT == EPOLLOUT); assert(POLLHUP == EPOLLHUP); assert(POLLERR == EPOLLERR); if (!CONF_EVNT_NO_EPOLL) { evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd); } return (evnt__epoll_fd != -1); } int evnt_poll_direct_enabled(void) { return (evnt__epoll_fd != -1); } static int evnt__epoll_readd(struct Evnt *evnt) { struct epoll_event epevent[1]; while (evnt) { int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags); epevent->events = flags; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n"); evnt = evnt->next; } return (TRUE); } int evnt_poll_child_init(void) { /* Spurious 0 events happen when you share epoll() fd's between tasks ... */ if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled()) { close(evnt__epoll_fd); evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); /* size does nothing */ if (evnt__epoll_fd == -1) VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n")); evnt__epoll_readd(q_connect); evnt__epoll_readd(q_accept); evnt__epoll_readd(q_recv); evnt__epoll_readd(q_send_recv); evnt__epoll_readd(q_none); } return (TRUE); } void evnt_wait_cntl_add(struct Evnt *evnt, int flags) { if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) return; SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN); if (evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_mod_add(%p,%d)\n", evnt, flags); epevent->events = flags; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n"); } } void evnt_wait_cntl_del(struct Evnt *evnt, int flags) { if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags)) return; SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags; if (flags && evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_mod_del(%p,%d)\n", evnt, flags); epevent->events = flags; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n"); } } unsigned int evnt_poll_add(struct Evnt *evnt, int fd) { unsigned int ind = socket_poll_add(fd); if (ind && evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; int flags = 0; vlg_dbg2(vlg, "epoll_add(%p,%d)\n", evnt, flags); epevent->events = flags; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1) { vlg_warn(vlg, "epoll: %m\n"); socket_poll_del(fd); } } return (ind); } void evnt_poll_del(struct Evnt *evnt) { if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1) close(SOCKET_POLL_INDICATOR(evnt->ind)->fd); socket_poll_del(evnt->ind); /* done via. the close() */ if (FALSE && evnt_poll_direct_enabled()) { int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd; struct epoll_event epevent[1]; vlg_dbg2(vlg, "epoll_del(%p,%d)\n", evnt, 0); epevent->events = 0; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); } } int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd) { unsigned int old_ind = evnt->ind; int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd; assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd); ASSERT(evnt__valid(evnt)); if (!(evnt->ind = socket_poll_add(fd))) goto poll_add_fail; if (!(evnt->io_r = vstr_make_base(NULL)) || !(evnt->io_w = vstr_make_base(NULL))) goto malloc_base_fail; SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN; socket_poll_del(old_ind); evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_fd__set_nonblock(fd, TRUE); if (evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; vlg_dbg2(vlg, "epoll_swap(%p,%d,%d)\n", evnt, old_fd, fd); epevent->events = POLLIN; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); epevent->events = SOCKET_POLL_INDICATOR(evnt->ind)->events; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); } ASSERT(evnt__valid(evnt)); return (TRUE); malloc_base_fail: socket_poll_del(evnt->ind); evnt->ind = old_ind; vstr_free_base(evnt->io_r); evnt->io_r = NULL; ASSERT(!evnt->io_r && !evnt->io_w); poll_add_fail: return (FALSE); } #define EVNT__EPOLL_EVENTS 128 int evnt_poll(void) { struct epoll_event events[EVNT__EPOLL_EVENTS]; int msecs = evnt__get_timeout(); int ret = -1; unsigned int scan = 0; if (evnt_child_exited) return (errno = EINTR, -1); if (evnt__poll_tst_accept(msecs)) return (evnt__poll_accept()); if (!evnt_poll_direct_enabled()) return (socket_poll_update_all(msecs)); ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs); if (ret == -1) return (ret); scan = ret; ASSERT(scan <= EVNT__EPOLL_EVENTS); while (scan-- > 0) { struct Evnt *evnt = NULL; unsigned int flags = 0; if (!(flags = events[scan].events)) { /* for spurious events when sharing epoll() fd's */ vlg_dbg1(vlg, "epoll_wait_zero(%p)\n", events[scan].data.ptr); continue; } evnt = events[scan].data.ptr; ASSERT(evnt__valid(evnt)); vlg_dbg2(vlg, "epoll_wait(%p,%d)\n", evnt, flags); vlg_dbg2(vlg, "epoll_wait[flags]=a=%d|r=%d\n", evnt->flag_q_accept, evnt->flag_q_recv); assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) || ((POLLHUP|POLLERR) & flags)); SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags; evnt__del_whatever(evnt); /* move to front of queue */ evnt__add_whatever(evnt); } return (ret); } #endif