/* * Copyright (C) 2002, 2003, 2004, 2005, 2006 James Antill * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * email: james@and.org */ /* IO events, and some help with timed events */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef TCP_CONGESTION # ifdef __linux__ # define TCP_CONGESTION 13 # else # define TCP_CONGESTION 0 # endif #endif /* FIXME: Should do better autoconf checks... */ #if defined(__linux__) /* Linux doesn't let TCP_NODELAY be config., like Solaris ... and maybe *BSD? * and doesn't inherit socket flags from accept() like *BSD */ # define HAVE_TCP_NODELAY_CONFIG FALSE #else # define HAVE_TCP_NODELAY_CONFIG TRUE #endif #if defined(__BSD__) /* Linux doesn't inherit socket flags from accept() like *BSD ... * and maybe Solaris? */ # define HAVE_SOCK_FLAGS_INHERIT TRUE #else # define HAVE_SOCK_FLAGS_INHERIT FALSE #endif #define EVNT__POLL_FLGS(x) \ ((((x) & (POLLIN | POLLOUT)) == (POLLIN | POLLOUT)) ? "(POLLIN | POLLOUT)" : \ (((x) & POLLIN) ? "(POLLIN)" : \ (((x) & POLLOUT) ? "(POLLOUT)" : \ "()"))) #define CONF_EVNT_NO_EPOLL FALSE #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */ #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */ #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */ #ifdef CONF_FULL_STATIC # define EVNT__RESOLVE_NAME(saddr, x) do { \ if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \ saddr->sin_addr.s_addr = htonl(INADDR_ANY); \ } while (FALSE) #else # include # define EVNT__RESOLVE_NAME(saddr, x) do { \ if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \ { \ struct hostent *h = gethostbyname(x); \ \ saddr->sin_addr.s_addr = htonl(INADDR_ANY); \ if (h) \ memcpy(&saddr->sin_addr.s_addr, \ h->h_addr_list[0], \ sizeof(saddr->sin_addr.s_addr)); \ } \ } while (FALSE) #endif #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER) # define CONF_USE_SOCKET_FILTERS FALSE struct sock_fprog { int dummy; }; # define SO_DETACH_FILTER 0 # define SO_ATTACH_FILTER 0 #else # define CONF_USE_SOCKET_FILTERS TRUE /* not in glibc... hope it's not in *BSD etc. */ struct sock_filter { uint16_t code; /* Actual filter code */ uint8_t jt; /* Jump true */ uint8_t jf; /* Jump false */ uint32_t k; /* Generic multiuse field */ }; struct sock_fprog { unsigned short len; /* Number of filter blocks */ struct sock_filter *filter; }; #endif #include "vlg.h" #define EX_UTILS_NO_USE_INIT 1 #define EX_UTILS_NO_USE_EXIT 1 #define EX_UTILS_NO_USE_LIMIT 1 #define EX_UTILS_NO_USE_BLOCK 1 #define EX_UTILS_NO_USE_PUT 1 #define EX_UTILS_NO_USE_OPEN 1 #define EX_UTILS_NO_USE_IO_FD 1 #include "ex_utils.h" #ifdef HAVE_TCP_CORK # define USE_TCP_CORK TRUE #else # define USE_TCP_CORK FALSE # define TCP_CORK 0 #endif #include "evnt.h" #include "mk.h" volatile sig_atomic_t evnt_child_exited = FALSE; int evnt_opt_nagle = EVNT_CONF_NAGLE; static struct Evnt *q_send_now = NULL; /* Try a send "now" */ static struct Evnt *q_closed = NULL; /* Close when fin. */ static struct Evnt *q_none = NULL; /* nothing */ static struct Evnt *q_accept = NULL; /* connections - recv */ static struct Evnt *q_connect = NULL; /* connections - send */ static struct Evnt *q_recv = NULL; /* recv */ static struct Evnt *q_send_recv = NULL; /* recv + send */ static Vlg *vlg = NULL; static unsigned int evnt__num = 0; /* things can move from recv -> send_recv with a timeout, * so they look like the end of events NULL */ static unsigned int evnt__scan_ready_moved_send_zero_fds = 0; /* this should be more configurable... */ static unsigned int evnt__accept_limit = 4; static struct timeval evnt__tv[1]; static int evnt__is_child = FALSE; #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL) #define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval)) void evnt_logger(Vlg *passed_vlg) { vlg = passed_vlg; } void evnt_fd__set_nonblock(int fd, int val) { int flags = 0; ASSERT(val == !!val); if ((flags = fcntl(fd, F_GETFL)) == -1) vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__); if (!!(flags & O_NONBLOCK) == val) return; if (val) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) == -1) vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__); } static void evnt_fd__set_coe(int fd, int val) { ASSERT(val == !!val); if (fcntl(fd, F_SETFD, val) == -1) vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__); } static int evnt_fd__set_nodelay(int fd, int val) { return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1); } static int evnt_fd__set_cork(int fd, int val) { return (setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) != -1); } static int evnt_fd__set_reuse(int fd, int val) { return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1); } #define CLEN_SZ(x) (strlen(x) + 1) static int evnt_fd__set_congestion(int fd, const char *val) { if (!TCP_CONGESTION) return (errno = ENOPROTOOPT, FALSE); return (setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, val, CLEN_SZ(val)) != -1); } void evnt_add(struct Evnt **que, struct Evnt *node) { assert(node != *que); if ((node->next = *que)) node->next->prev = node; node->prev = NULL; *que = node; } void evnt_del(struct Evnt **que, struct Evnt *node) { if (node->prev) node->prev->next = node->next; else { assert(*que == node); *que = node->next; } if (node->next) node->next->prev = node->prev; } static void evnt__del_whatever(struct Evnt *evnt) { if (0) { } else if (evnt->flag_q_accept) evnt_del(&q_accept, evnt); else if (evnt->flag_q_connect) evnt_del(&q_connect, evnt); else if (evnt->flag_q_recv) evnt_del(&q_recv, evnt); else if (evnt->flag_q_send_recv) evnt_del(&q_send_recv, evnt); else if (evnt->flag_q_none) evnt_del(&q_none, evnt); else ASSERT_NOT_REACHED(); } static void COMPILE_ATTR_USED() evnt__add_whatever(struct Evnt *evnt) { if (0) { } else if (evnt->flag_q_accept) evnt_add(&q_accept, evnt); else if (evnt->flag_q_connect) evnt_add(&q_connect, evnt); else if (evnt->flag_q_recv) evnt_add(&q_recv, evnt); else if (evnt->flag_q_send_recv) evnt_add(&q_send_recv, evnt); else if (evnt->flag_q_none) evnt_add(&q_none, evnt); else ASSERT_NOT_REACHED(); } static unsigned int evnt__debug_num_1(struct Evnt *scan) { unsigned int num = 0; while (scan) { struct Evnt *scan_next = scan->next; ++num; scan = scan_next; } return (num); } #if COMPILE_DEBUG static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt) { struct Evnt **ret = que; while (*ret) { if (*ret == evnt) return (ret); ret = &(*ret)->next; } return (NULL); } static int evnt__valid(struct Evnt *evnt) { int ret = 0; ASSERT(evnt_num_all()); ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv + evnt->flag_q_send_recv + evnt->flag_q_none) == 1); if (evnt->flag_q_send_now) { struct Evnt **scan = &q_send_now; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(*scan); } else { struct Evnt **scan = &q_send_now; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(!*scan); } if (evnt->flag_q_closed) { struct Evnt **scan = &q_closed; while (*scan && (*scan != evnt)) scan = &(*scan)->c_next; ASSERT(*scan); } else { struct Evnt **scan = &q_closed; while (*scan && (*scan != evnt)) scan = &(*scan)->c_next; ASSERT(!*scan); } if (0) { } else if (evnt->flag_q_accept) { ret = !!evnt__srch(&q_accept, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); ASSERT(!evnt->io_r && !evnt->io_w); } else if (evnt->flag_q_connect) { ret = !!evnt__srch(&q_connect, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN)); assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) || evnt->tm_l_w); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN)); } else if (evnt->flag_q_send_recv) { ret = !!evnt__srch(&q_send_recv, evnt); assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) || evnt->tm_l_w); } else if (evnt->flag_q_recv) { ret = !!evnt__srch(&q_recv, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); } else if (evnt->flag_q_none) { ret = !!evnt__srch(&q_none, evnt); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN)); assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT)); } else ASSERT_NOT_REACHED(); return (ret); } static unsigned int evnt__debug_num_all(void) { unsigned int num = 0; num += evnt__debug_num_1(q_connect); num += evnt__debug_num_1(q_accept); num += evnt__debug_num_1(q_recv); num += evnt__debug_num_1(q_send_recv); num += evnt__debug_num_1(q_none); return (num); } #endif int evnt_fd(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); return (SOCKET_POLL_INDICATOR(evnt->ind)->fd); } int evnt_cb_func_connect(struct Evnt *COMPILE_ATTR_UNUSED(evnt)) { return (TRUE); } struct Evnt *evnt_cb_func_accept(struct Evnt *COMPILE_ATTR_UNUSED(evnt), int COMPILE_ATTR_UNUSED(fd), struct sockaddr *COMPILE_ATTR_UNUSED(sa), socklen_t COMPILE_ATTR_UNUSED(len)) { return (NULL); } int evnt_cb_func_recv(struct Evnt *evnt) { unsigned int ern = 0; int ret = evnt_recv(evnt, &ern); if (ret) return (TRUE); if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len) return (evnt_shutdown_r(evnt, TRUE)); return (FALSE); } int evnt_cb_func_send(struct Evnt *evnt) { int ret = -1; evnt_fd_set_cork(evnt, TRUE); ret = evnt_send(evnt); if (!evnt->io_w->len) evnt_fd_set_cork(evnt, FALSE); return (ret); } void evnt_cb_func_free(struct Evnt *evnt) { MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt)); free(evnt); } void evnt_cb_func_F(struct Evnt *evnt) { F(evnt); } int evnt_cb_func_shutdown_r(struct Evnt *evnt) { vlg_dbg2(vlg, "SHUTDOWN CB from[$]\n", EVNT_SA(evnt)); if (!evnt_shutdown_r(evnt, FALSE)) return (FALSE); /* called from outside read, and read'll never get called again ... * so quit if we have nothing to send */ return (!!evnt->io_w->len); } static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref, struct Evnt *from_evnt) { ASSERT(ref); evnt->flag_q_accept = FALSE; evnt->flag_q_connect = FALSE; evnt->flag_q_recv = FALSE; evnt->flag_q_send_recv = FALSE; evnt->flag_q_none = FALSE; evnt->flag_q_send_now = FALSE; evnt->flag_q_closed = FALSE; evnt->flag_q_pkt_move = FALSE; evnt->flag_io_nagle = FALSE; evnt->flag_io_cork = FALSE; evnt->flag_io_filter = FALSE; evnt->flag_fully_acpt = FALSE; evnt->flag_insta_close = FALSE; evnt->io_r_shutdown = FALSE; evnt->io_w_shutdown = FALSE; evnt->io_r_limited = FALSE; evnt->io_w_limited = FALSE; evnt->prev_bytes_r = 0; evnt->acct.req_put = 0; evnt->acct.req_got = 0; evnt->acct.bytes_r = 0; evnt->acct.bytes_w = 0; evnt->cbs->cb_func_accept = evnt_cb_func_accept; evnt->cbs->cb_func_connect = evnt_cb_func_connect; evnt->cbs->cb_func_recv = evnt_cb_func_recv; evnt->cbs->cb_func_send = evnt_cb_func_send; evnt->cbs->cb_func_free = evnt_cb_func_F; evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r; if (!(evnt->io_r = vstr_make_base(NULL))) goto make_vstr_fail; if (!(evnt->io_w = vstr_make_base(NULL))) goto make_vstr_fail; evnt->tm_o = NULL; evnt->tm_l_r = NULL; evnt->tm_l_w = NULL; evnt->lims = NULL; evnt->lim_num = 0; EVNT__COPY_TV(&evnt->ctime); EVNT__COPY_TV(&evnt->mtime); evnt->msecs_tm_mtime = 0; evnt_fd__set_coe(fd, TRUE); evnt->sa_ref = vstr_ref_add(ref); evnt->acpt_sa_ref = NULL; if (!(evnt->ind = evnt_poll_add(evnt, fd))) goto poll_add_fail; /* FIXME: need group settings */ if (HAVE_SOCK_FLAGS_INHERIT && from_evnt && (from_evnt->flag_io_nagle == evnt_opt_nagle)) evnt->flag_io_nagle = evnt_opt_nagle; else if (HAVE_TCP_NODELAY_CONFIG || !evnt_opt_nagle) { evnt_fd__set_nodelay(fd, !evnt->flag_io_nagle); evnt->flag_io_nagle = evnt_opt_nagle; } if (!HAVE_SOCK_FLAGS_INHERIT || !from_evnt) evnt_fd__set_nonblock(fd, TRUE); return (TRUE); poll_add_fail: vstr_ref_del(evnt->sa_ref); evnt->sa_ref = NULL; vstr_free_base(evnt->io_w); make_vstr_fail: vstr_free_base(evnt->io_r); errno = ENOMEM; return (FALSE); } static void evnt__free1(struct Evnt *evnt) { evnt_send_del(evnt); if (evnt->io_r && evnt->io_r->len) vlg_dbg2(vlg, "evnt__free1($) io_r len = %zu\n", EVNT_SA(evnt), evnt->io_r->len); if (evnt->io_w && evnt->io_w->len) vlg_dbg2(vlg, "evnt__free1($) io_w len = %zu\n", EVNT_SA(evnt), evnt->io_w->len); vstr_free_base(evnt->io_w); evnt->io_w = NULL; vstr_free_base(evnt->io_r); evnt->io_r = NULL; evnt_poll_del(evnt); evnt_limit_free(evnt); } static void evnt__free_tq(Timer_q_node *tm) { if (tm) { timer_q_cntl_node(tm, TIMER_Q_CNTL_NODE_SET_DATA, NULL); timer_q_quick_del_node(tm); } } static void evnt__free2(Vstr_ref *sa, Vstr_ref *acpt_sa, Timer_q_node *tm_o, Timer_q_node *tm_l_r, Timer_q_node *tm_l_w) { /* post callbacks, evnt no longer exists */ vstr_ref_del(sa); vstr_ref_del(acpt_sa); evnt__free_tq(tm_o); evnt__free_tq(tm_l_r); evnt__free_tq(tm_l_w); } static void evnt__free(struct Evnt *evnt) { if (evnt) { Vstr_ref *sa = evnt->sa_ref; Vstr_ref *acpt_sa = evnt->acpt_sa_ref; Timer_q_node *tm_o = evnt->tm_o; Timer_q_node *tm_l_r = evnt->tm_l_r; Timer_q_node *tm_l_w = evnt->tm_l_w; evnt__free1(evnt); ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */ --evnt__num; ASSERT(evnt__num == evnt__debug_num_all()); evnt->cbs->cb_func_free(evnt); evnt__free2(sa, acpt_sa, tm_o, tm_l_r, tm_l_w); } } void evnt_free(struct Evnt *evnt) { if (evnt) { evnt__del_whatever(evnt); evnt__free(evnt); } } static void evnt__close_now(struct Evnt *evnt) { if (evnt->flag_q_closed) return; evnt_free(evnt); } static void evnt__uninit(struct Evnt *evnt) { ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv + evnt->flag_q_send_recv + evnt->flag_q_none) == 0); evnt__free1(evnt); evnt__free2(evnt->sa_ref, evnt->acpt_sa_ref, evnt->tm_o, evnt->tm_l_r, evnt->tm_l_w); } static void evnt__fd_close_noerrno(int fd) { int saved_errno = errno; close(fd); errno = saved_errno; } static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags) { evnt_add(que, evnt); ++evnt__num; evnt_wait_cntl_add(evnt, flags); ASSERT(evnt__valid(evnt)); return (TRUE); } int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port) { int fd = -1; socklen_t alloc_len = sizeof(struct sockaddr_in); Vstr_ref *ref = NULL; struct sockaddr_in *saddr = NULL; if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) goto sock_fail; EVNT__UPDATE_TV(); if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; saddr = ref->ptr; saddr->sin_family = AF_INET; saddr->sin_port = htons(port); saddr->sin_addr.s_addr = inet_addr(ipv4_string); if (!evnt_init(evnt, fd, ref, NULL)) goto init_fail; evnt->flag_q_pkt_move = TRUE; ASSERT(port && (saddr->sin_addr.s_addr != htonl(INADDR_ANY))); if (connect(fd, EVNT_SA(evnt), alloc_len) == -1) { if (errno == EINPROGRESS) { /* The connection needs more time....*/ vstr_ref_del(ref); evnt->flag_q_connect = TRUE; return (evnt__make_end(&q_connect, evnt, POLLOUT)); } goto connect_fail; } vstr_ref_del(ref); evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); connect_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_con_local(struct Evnt *evnt, const char *fname) { int fd = -1; size_t len = strlen(fname) + 1; struct sockaddr_un tmp_sun; struct sockaddr_un *saddr = NULL; socklen_t alloc_len = 0; Vstr_ref *ref = NULL; tmp_sun.sun_path[0] = 0; alloc_len = SUN_LEN(&tmp_sun) + len; if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1) goto sock_fail; EVNT__UPDATE_TV(); if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; saddr = ref->ptr; saddr->sun_family = AF_LOCAL; memcpy(saddr->sun_path, fname, len); if (!evnt_init(evnt, fd, ref, NULL)) goto init_fail; evnt->flag_q_pkt_move = TRUE; if (connect(fd, EVNT_SA(evnt), alloc_len) == -1) { if (errno == EINPROGRESS) { /* The connection needs more time....*/ vstr_ref_del(ref); evnt->flag_q_connect = TRUE; return (evnt__make_end(&q_connect, evnt, POLLOUT)); } goto connect_fail; } vstr_ref_del(ref); evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); connect_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa) { if (!evnt_init(evnt, fd, sa, NULL)) return (FALSE); evnt->flag_q_recv = TRUE; return (evnt__make_end(&q_recv, evnt, POLLIN)); } int evnt_make_acpt_dup(struct Evnt *evnt, int fd, struct sockaddr *sa, socklen_t len) { Vstr_ref *ref = vstr_ref_make_memdup(sa, len); int ret = FALSE; if (!ref) { errno = ENOMEM; return (FALSE); } ret = evnt_make_acpt_ref(evnt, fd, ref); vstr_ref_del(ref); return (ret); } static int evnt__make_bind_end(struct Evnt *evnt) { vstr_free_base(evnt->io_r); evnt->io_r = NULL; vstr_free_base(evnt->io_w); evnt->io_w = NULL; evnt->flag_q_accept = TRUE; evnt__make_end(&q_accept, evnt, POLLIN); SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0; return (TRUE); } int evnt_make_bind_ipv4(struct Evnt *evnt, const char *acpt_addr, short server_port, unsigned int listen_len, const char *cong) { int fd = -1; socklen_t alloc_len = sizeof(struct sockaddr_in); Vstr_ref *ref = NULL; struct sockaddr_in *saddr = NULL; if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) VLG_WARN_GOTO(sock_fail, (vlg, "socket(): %m\n")); EVNT__UPDATE_TV(); if (!(ref = vstr_ref_make_malloc(alloc_len))) VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__)); saddr = ref->ptr; saddr->sin_family = AF_INET; saddr->sin_addr.s_addr = htonl(INADDR_ANY); if (acpt_addr && *acpt_addr) /* silent error becomes */ EVNT__RESOLVE_NAME(saddr, acpt_addr); if (saddr->sin_addr.s_addr == htonl(INADDR_ANY)) acpt_addr = "any"; saddr->sin_port = htons(server_port); if (!evnt_init(evnt, fd, ref, NULL)) VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__)); if (!evnt_fd__set_reuse(fd, TRUE)) VLG_WARNNOMEM_GOTO(reuse_fail, (vlg, "%s(): %m\n", __func__)); if (cong && !evnt_fd__set_congestion(fd, cong) && (errno != ENOPROTOOPT)) VLG_WARN_GOTO(cong_fail, (vlg, "setsockopt(TCP_CONGESTION, %s): %m\n", cong)); if (bind(fd, EVNT_SA(evnt), alloc_len) == -1) VLG_WARN_GOTO(bind_fail, (vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port)); if (!server_port) if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1) VLG_WARN_GOTO(getsockname_fail, (vlg, "getsockname(): %m\n")); if (listen(fd, listen_len) == -1) VLG_WARN_GOTO(listen_fail, (vlg, "listen(%d): %m\n", listen_len)); vstr_ref_del(ref); return (evnt__make_bind_end(evnt)); listen_fail: getsockname_fail: bind_fail: cong_fail: reuse_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_bind_local(struct Evnt *evnt, const char *fname, unsigned int listen_len) { int fd = -1; int saved_errno = 0; size_t len = strlen(fname) + 1; struct sockaddr_un tmp_sun; struct sockaddr_un *saddr = NULL; socklen_t alloc_len = 0; Vstr_ref *ref = NULL; tmp_sun.sun_path[0] = 0; alloc_len = SUN_LEN(&tmp_sun) + len; if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1) goto sock_fail; EVNT__UPDATE_TV(); if (!(ref = vstr_ref_make_malloc(alloc_len))) goto init_fail; saddr = ref->ptr; saddr->sun_family = AF_LOCAL; memcpy(saddr->sun_path, fname, len); if (!evnt_init(evnt, fd, ref, NULL)) goto init_fail; if (unlink(fname) != -1) vlg_warn(vlg, "HAD to unlink(%s) for bind\n", fname); if (bind(fd, EVNT_SA(evnt), alloc_len) == -1) goto bind_fail; if (fchmod(fd, 0600) == -1) goto fchmod_fail; if (listen(fd, listen_len) == -1) goto listen_fail; vstr_ref_del(ref); return (evnt__make_bind_end(evnt)); bind_fail: saved_errno = errno; vlg_warn(vlg, "bind(%s): %m\n", fname); errno = saved_errno; fchmod_fail: listen_fail: evnt__uninit(evnt); init_fail: vstr_ref_del(ref); evnt__fd_close_noerrno(fd); sock_fail: return (FALSE); } int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags) { static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1}; if (!sa) sa = &dummy_sa; EVNT__UPDATE_TV(); if (!evnt_init(evnt, fd, sa, NULL)) { evnt__fd_close_noerrno(fd); return (FALSE); } if (flags & POLLIN) { evnt->flag_q_recv = TRUE; return (evnt__make_end(&q_recv, evnt, POLLIN)); } evnt->flag_q_none = TRUE; return (evnt__make_end(&q_none, evnt, 0)); } void evnt_close(struct Evnt *evnt) { if (!evnt) return; ASSERT(evnt__valid(evnt)); if (evnt->flag_q_closed) return; /* can't close at this point or we'll race with: * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */ /* queue for deletion ... as the bt might still be using the ptr */ evnt->flag_q_closed = TRUE; evnt->c_next = q_closed; q_closed = evnt; ASSERT(evnt__valid(evnt)); } int evnt_limit_add(struct Evnt *evnt, Vstr_ref *ref) { Vstr_ref **refs = evnt->lims; unsigned int lim_num = evnt->lim_num + 1; struct Evnt_limit *lim = ref->ptr; ASSERT((!evnt->lims && (lim_num == 1)) || ( evnt->lims && (lim_num >= 2))); evnt->io_r_limited |= !!lim->io_r_max; evnt->io_w_limited |= !!lim->io_w_max; if (lim_num == 1) { if (!(evnt->lims = MK(sizeof(Vstr_ref *)))) return (FALSE); } else if (!MV(evnt->lims, refs, sizeof(Vstr_ref *) * lim_num)) return (FALSE); evnt->lims[evnt->lim_num] = vstr_ref_add(ref); evnt->lim_num = lim_num; return (TRUE); } int evnt_limit_dup(struct Evnt *evnt, const struct Evnt_limit *lim) { Vstr_ref *ref = vstr_ref_make_memdup(lim, sizeof(struct Evnt_limit)); int ret = FALSE; if (!ref) return (FALSE); ret = evnt_limit_add(evnt, ref); vstr_ref_del(ref); return (ret); } void evnt_limit_chg(struct Evnt *evnt, unsigned int scan, Vstr_ref *ref) { Vstr_ref *tmp = NULL; ASSERT(evnt->lim_num > scan); tmp = evnt->lims[scan]; if (ref) { struct Evnt_limit *lim = ref->ptr; evnt->io_r_limited |= !!lim->io_r_max; evnt->io_w_limited |= !!lim->io_w_max; ref = vstr_ref_add(ref); } evnt->lims[scan] = ref; vstr_ref_del(tmp); } void evnt_limit_alt(struct Evnt *evnt, unsigned int scan, const struct Evnt_limit *nlim) { Vstr_ref *ref = NULL; struct Evnt_limit *lim = NULL; ASSERT(evnt->lim_num > scan); ref = evnt->lims[scan]; ASSERT(ref); ASSERT(ref->ref == 1); /* should be the only owner of this data, * if we are changing it */ lim = ref->ptr; evnt->io_r_limited |= !!lim->io_r_max; evnt->io_w_limited |= !!lim->io_w_max; lim->io_r_max = nlim->io_r_max; lim->io_w_max = nlim->io_w_max; if (lim->io_r_cur > lim->io_r_max) lim->io_r_cur = lim->io_r_max; if (lim->io_w_cur > lim->io_w_max) lim->io_w_cur = lim->io_w_max; } void evnt_limit_free(struct Evnt *evnt) { unsigned int scan = 0; while (scan < evnt->lim_num) vstr_ref_del(evnt->lims[scan++]); ASSERT(scan == evnt->lim_num); F(evnt->lims); evnt->lims = NULL; evnt->lim_num = 0; } static void evnt__limit_tm_chk(const struct timeval *tv, struct timeval *ltv, unsigned long *cur, unsigned long max) { struct timeval tv_end[1]; struct timeval tv_reset[1]; *tv_end = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_end, 1, 0); *tv_reset = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_reset, 3, 0); if (!ltv->tv_sec) { *cur = max; return; } /* if it's been more than a second ... reset */ if (TIMER_Q_TIMEVAL_CMP(tv, tv_end) > 0) { if (TIMER_Q_TIMEVAL_CMP(tv, tv_reset) < 0) ++ltv->tv_sec; /* try and be nice */ else { /* if it's been more than 3 seconds, just restart */ ltv->tv_sec = 0; ltv->tv_usec = 0; } *cur = max; } } static void evnt__limit_chk(struct Evnt *evnt) { struct timeval tv[1]; unsigned int scan = 0; EVNT__COPY_TV(tv); evnt->io_r_limited = FALSE; /* blank and reset */ evnt->io_w_limited = FALSE; while (scan < evnt->lim_num) { if (evnt->lims[scan]) { struct Evnt_limit *lim = evnt->lims[scan]->ptr; evnt->io_r_limited |= !!lim->io_r_max; evnt->io_w_limited |= !!lim->io_w_max; if (evnt->io_r_limited) { vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n", lim->io_r_cur, lim->io_r_max, lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec); evnt__limit_tm_chk(tv, &lim->io_r_tm, &lim->io_r_cur, lim->io_r_max); vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n", lim->io_r_cur, lim->io_r_max, lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec); } if (evnt->io_w_limited) { vlg_dbg2(vlg, "limit chk w beg (%'lu <= %'lu) @%lu:%lu\n", lim->io_w_cur, lim->io_w_max, lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec); evnt__limit_tm_chk(tv, &lim->io_w_tm, &lim->io_w_cur, lim->io_w_max); vlg_dbg2(vlg, "limit chk w end (%'lu <= %'lu) @%lu:%lu\n", lim->io_w_cur, lim->io_w_max, lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec); } } ++scan; } ASSERT(scan == evnt->lim_num); } static struct Evnt_limit *evnt_limit_r(struct Evnt *evnt, unsigned long sz) { struct Evnt_limit *ret = NULL; unsigned int scan = 0; if (!evnt->io_r_limited) return (NULL); evnt__limit_chk(evnt); while (scan < evnt->lim_num) { if (evnt->lims[scan]) { struct Evnt_limit *lim = evnt->lims[scan]->ptr; if (lim->io_r_max && (!ret || (ret->io_r_cur > lim->io_r_cur))) ret = lim; } ++scan; } ASSERT(scan == evnt->lim_num); if (ret) vlg_dbg2(vlg, "limit io_r(%'lu <= %'lu)\n", ret->io_r_cur, sz); if (ret && (ret->io_r_cur > sz)) ret = NULL; return (ret); } static struct Evnt_limit *evnt_limit_w(struct Evnt *evnt, unsigned long sz) { struct Evnt_limit *ret = NULL; unsigned int scan = 0; if (!evnt->io_w_limited) return (NULL); evnt__limit_chk(evnt); while (scan < evnt->lim_num) { if (evnt->lims[scan]) { struct Evnt_limit *lim = evnt->lims[scan]->ptr; if (lim->io_w_max && (!ret || (ret->io_w_cur > lim->io_w_cur))) ret = lim; } ++scan; } ASSERT(scan == evnt->lim_num); if (ret) vlg_dbg2(vlg, "limit io_w(%'lu <= %'lu)\n", ret->io_w_cur, sz); if (ret && (ret->io_w_cur > sz)) ret = NULL; return (ret); } static Timer_q_base *evnt__timeout_lim_r_1 = NULL; /* read/recv/accept */ static Timer_q_base *evnt__timeout_lim_r_10 = NULL; static Timer_q_base *evnt__timeout_lim_w_1 = NULL; /* write/send/connect */ static Timer_q_base *evnt__timeout_lim_w_10 = NULL; static void evnt__timer_cb_lim_r(int type, void *data) { struct Evnt *evnt = data; if (!evnt) /* deleted */ return; ASSERT(evnt__valid(evnt)); if (type == TIMER_Q_TYPE_CALL_RUN_ALL) return; evnt->tm_l_r = NULL; evnt_wait_cntl_add(evnt, POLLIN); if (type == TIMER_Q_TYPE_CALL_DEL) return; if (evnt->flag_q_accept) { assert(FALSE); } else if (!evnt->cbs->cb_func_recv(evnt)) evnt__close_now(evnt); } static int evnt_limit_timeout_r(struct Evnt *evnt, struct Evnt_limit *lim) { struct timeval tv[1]; unsigned long msecs = 0; if (evnt->tm_l_r || !lim || lim->io_r_cur) return (TRUE); EVNT__COPY_TV(tv); msecs = timer_q_timeval_udiff_msecs(tv, &lim->io_r_tm); if (msecs < 1000) msecs = 1000 - msecs; else msecs = 0; vlg_dbg2(vlg, "timeout_r_make($, %lu)\n", EVNT_SA(evnt), msecs); if (msecs <= 100) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 0, 100 * 1000); evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_1, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else { TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0); evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_10, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } evnt_wait_cntl_del(evnt, POLLIN); return (!!evnt->tm_l_r); } static void evnt__timer_cb_lim_w(int type, void *data) { struct Evnt *evnt = data; if (!evnt) /* deleted */ return; ASSERT(evnt__valid(evnt)); if (type == TIMER_Q_TYPE_CALL_RUN_ALL) return; evnt->tm_l_w = NULL; if (evnt->flag_q_send_recv) { evnt_wait_cntl_add(evnt, POLLOUT); SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLOUT; } if (type == TIMER_Q_TYPE_CALL_DEL) return; if (evnt->flag_q_connect) { assert(FALSE); } else if (!evnt->cbs->cb_func_send(evnt)) evnt__close_now(evnt); } static int evnt_limit_timeout_w(struct Evnt *evnt, struct Evnt_limit *lim) { struct timeval tv[1]; unsigned long msecs = 0; if (evnt->tm_l_w || !lim || lim->io_w_cur) return (TRUE); EVNT__COPY_TV(tv); msecs = timer_q_timeval_udiff_msecs(tv, &lim->io_w_tm); if (msecs < 1000) msecs = 1000 - msecs; else msecs = 0; vlg_dbg2(vlg, "timeout_w_make($, %lu)\n", EVNT_SA(evnt), msecs); if (msecs <= 100) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 0, 100 * 1000); evnt->tm_l_w = timer_q_add_node(evnt__timeout_lim_w_1, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else { TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0); evnt->tm_l_w = timer_q_add_node(evnt__timeout_lim_w_10, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } evnt_wait_cntl_del(evnt, POLLOUT); return (!!evnt->tm_l_w); } /* takes limit values into account as well */ static void evnt__acct_r(struct Evnt *evnt, size_t bytes) { unsigned int scan = 0; while (scan < evnt->lim_num) { if (evnt->lims[scan]) { struct Evnt_limit *lim = evnt->lims[scan]->ptr; if (lim->io_r_max) { if (!lim->io_r_tm.tv_sec) { EVNT__COPY_TV(&lim->io_r_tm.tv_sec); ASSERT(lim->io_r_cur == lim->io_r_max); } /* must be at least this small, or we screwed up limiting */ ASSERT(lim->io_r_cur >= bytes); lim->io_r_cur -= bytes; } } ++scan; } ASSERT(scan == evnt->lim_num); evnt->acct.bytes_r += bytes; } static void evnt__acct_w(struct Evnt *evnt, size_t bytes) { unsigned int scan = 0; while (scan < evnt->lim_num) { if (evnt->lims[scan]) { struct Evnt_limit *lim = evnt->lims[scan]->ptr; if (lim->io_w_max) { if (!lim->io_w_tm.tv_sec) { EVNT__COPY_TV(&lim->io_w_tm.tv_sec); ASSERT(lim->io_w_cur == lim->io_w_max); } /* must be at least this small, or we screwed up limiting */ ASSERT(lim->io_w_cur >= bytes); lim->io_w_cur -= bytes; } } ++scan; } ASSERT(scan == evnt->lim_num); evnt->acct.bytes_w += bytes; } void evnt_put_pkt(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); if (evnt->flag_q_pkt_move && evnt->flag_q_none) { ASSERT(evnt->acct.req_put >= evnt->acct.req_got); evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_wait_cntl_add(evnt, POLLIN); } ++evnt->acct.req_put; ASSERT(evnt__valid(evnt)); } void evnt_got_pkt(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); ++evnt->acct.req_got; if (evnt->flag_q_pkt_move && !evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got)) { ASSERT(evnt->acct.req_put >= evnt->acct.req_got); evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE; evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE; evnt_wait_cntl_del(evnt, POLLIN); } ASSERT(evnt__valid(evnt)); } static int evnt__call_send(struct Evnt *evnt, unsigned int *ern) { size_t tmp = evnt->io_w->len; int fd = evnt_fd(evnt); struct Evnt_limit *lim = NULL; if (!(lim = evnt_limit_w(evnt, evnt->io_w->len))) { if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN)) return (FALSE); } else if (lim->io_w_cur) { if (!vstr_sc_write_fd(evnt->io_w, 1, lim->io_w_cur, fd, ern) && (errno != EAGAIN)) return (FALSE); } tmp -= evnt->io_w->len; vlg_dbg3(vlg, "write($) = %zu\n", EVNT_SA(evnt), tmp); evnt__acct_w(evnt, tmp); if (!evnt_limit_timeout_w(evnt, lim)) { errno = ENOMEM, *ern = VSTR_TYPE_SC_WRITE_FD_ERR_MEM; return (FALSE); } return (TRUE); } int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz) { ASSERT(evnt__valid(evnt)); vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n", evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q); if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz)) { if (!evnt__call_send(evnt, NULL)) { ASSERT(evnt__valid(evnt)); return (FALSE); } if (!evnt->io_w->len && !force_q) { ASSERT(evnt__valid(evnt)); return (TRUE); } } /* already on send_q -- or already polling (and not forcing) */ if (evnt->flag_q_send_now || evnt->tm_l_w || (evnt->flag_q_send_recv && !force_q)) { ASSERT(evnt__valid(evnt)); return (TRUE); } evnt->s_next = q_send_now; q_send_now = evnt; evnt->flag_q_send_now = TRUE; ASSERT(evnt__valid(evnt)); return (TRUE); } /* if a connection is on the send now q, then remove them ... this is only * done when the client gets killed, so it doesn't matter if it's slow */ void evnt_send_del(struct Evnt *evnt) { struct Evnt **scan = &q_send_now; if (!evnt->flag_q_send_now) return; while (*scan && (*scan != evnt)) scan = &(*scan)->s_next; ASSERT(*scan); *scan = evnt->s_next; evnt->flag_q_send_now = FALSE; } int evnt_shutdown_r(struct Evnt *evnt, int got_eof) { ASSERT(evnt__valid(evnt)); if (evnt->io_r_shutdown || evnt->io_w_shutdown) return (FALSE); evnt_wait_cntl_del(evnt, POLLIN); vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$]\n", got_eof, EVNT_SA(evnt)); if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1)) { if (errno != ENOTCONN) vlg_warn(vlg, "shutdown(SHUT_RD): %m\n"); return (FALSE); } evnt->io_r_shutdown = TRUE; ASSERT(evnt__valid(evnt)); return (TRUE); } static void evnt__send_fin(struct Evnt *evnt) { if (0) { /* nothing */ } else if ( evnt->flag_q_send_recv && !evnt->io_w->len) { evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE; if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got)) { evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE; evnt_wait_cntl_del(evnt, POLLIN | POLLOUT); } else { evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_wait_cntl_del(evnt, POLLOUT); } } else if (!evnt->flag_q_send_recv && evnt->io_w->len) { int pflags = evnt->tm_l_w ? 0 : POLLOUT; ASSERT(evnt->flag_q_none || evnt->flag_q_recv); if (evnt->flag_q_none) evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE; else evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE; evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE; if (!evnt->io_r_shutdown && !evnt->tm_l_r) pflags |= POLLIN; if (!pflags) ++evnt__scan_ready_moved_send_zero_fds; evnt_wait_cntl_add(evnt, pflags); } } int evnt_shutdown_w(struct Evnt *evnt) { ASSERT(evnt__valid(evnt)); vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$]\n", EVNT_SA(evnt)); evnt_fd_set_cork(evnt, FALSE); /* eats data in 2.4.22-1.2199.4.legacy.npt */ if (evnt->io_r_shutdown || evnt->io_w_shutdown) return (FALSE); if (shutdown(evnt_fd(evnt), SHUT_WR) == -1) { if (errno != ENOTCONN) vlg_warn(vlg, "shutdown(SHUT_WR): %m\n"); return (FALSE); } evnt->io_w_shutdown = TRUE; vstr_del(evnt->io_w, 1, evnt->io_w->len); vstr_del(evnt->io_r, 1, evnt->io_r->len); evnt__send_fin(evnt); if (evnt->flag_q_recv) evnt_wait_cntl_add(evnt, POLLIN); ASSERT(evnt__valid(evnt)); return (TRUE); } int evnt_recv(struct Evnt *evnt, unsigned int *ern) { Vstr_base *data = evnt->io_r; size_t tmp = evnt->io_r->len; unsigned int num_min = 2; unsigned int num_max = 6; /* ave. browser reqs are 500ish, ab is much less */ struct Evnt_limit *lim = NULL; unsigned int buf_sz = 0; ASSERT(evnt__valid(evnt) && ern); /* default for HTTPD's buf size of 120 (128 - 8) */ if (!vstr_cntl_conf(data->conf, VSTR_CNTL_CONF_GET_NUM_BUF_SZ, &buf_sz)) buf_sz = 120; if (evnt->prev_bytes_r >= (buf_sz * 2)) { num_min = 8; /* 8 * 120 = 960 */ num_max = 8; } if (evnt->prev_bytes_r >= (buf_sz * 8)) num_max = 64; /* 64 * 120 = 7680 */ if (!(lim = evnt_limit_r(evnt, num_max * buf_sz))) vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern); else if (lim->io_r_cur) vstr_sc_read_len_fd(data, data->len, evnt_fd(evnt), lim->io_r_cur, ern); else /* pretend we read "something" ... but magicly got nothing, not EOF */ *ern = VSTR_TYPE_SC_READ_FD_ERR_NONE; evnt->prev_bytes_r = (evnt->io_r->len - tmp); evnt__acct_r(evnt, evnt->prev_bytes_r); vlg_dbg3(vlg, "read($) = %ju\n", EVNT_SA(evnt), evnt->prev_bytes_r); if (!evnt_limit_timeout_r(evnt, lim)) errno = ENOMEM, *ern = VSTR_TYPE_SC_READ_FD_ERR_MEM; switch (*ern) { case VSTR_TYPE_SC_READ_FD_ERR_NONE: if (evnt->io_w_shutdown) /* doesn't count if we can't respond */ vstr_del(data, 1, data->len); else EVNT__COPY_TV(&evnt->mtime); return (TRUE); case VSTR_TYPE_SC_READ_FD_ERR_MEM: vlg_warn(vlg, "%s\n", __func__); break; case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO: if (errno == EAGAIN) return (TRUE); break; case VSTR_TYPE_SC_READ_FD_ERR_EOF: break; default: /* unknown */ vlg_warn(vlg, "read_iov() = %d: %m\n", *ern); } return (FALSE); } int evnt_send(struct Evnt *evnt) { unsigned int ern = 0; ASSERT(evnt__valid(evnt)); if (!evnt__call_send(evnt, &ern)) return (FALSE); EVNT__COPY_TV(&evnt->mtime); evnt__send_fin(evnt); ASSERT(evnt__valid(evnt)); return (TRUE); } #ifndef HAVE_OFF64_T # define sendfile64 sendfile #endif int evnt_sendfile(struct Evnt *evnt, int ffd, uintmax_t *f_off, uintmax_t *f_len, unsigned int *ern) { ssize_t ret = 0; off64_t tmp_off = *f_off; size_t tmp_len = *f_len; struct Evnt_limit *lim = NULL; *ern = 0; ASSERT(evnt__valid(evnt)); ASSERT(!evnt->io_w->len); if (*f_len > SSIZE_MAX) tmp_len = SSIZE_MAX; if (!(lim = evnt_limit_w(evnt, tmp_len))) { if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1) { if (errno == EAGAIN) return (TRUE); *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO; return (FALSE); } } else if (lim->io_w_cur) { if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, lim->io_w_cur)) == -1) { if (errno == EAGAIN) return (TRUE); *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO; return (FALSE); } } if (!ret && (!lim || lim->io_w_cur)) { *ern = VSTR_TYPE_SC_READ_FD_ERR_EOF; return (FALSE); } *f_off = tmp_off; evnt__acct_w(evnt, ret); if (!evnt_limit_timeout_w(evnt, lim)) { errno = ENOMEM, *ern = VSTR_TYPE_SC_WRITE_FD_ERR_MEM; return (FALSE); } EVNT__COPY_TV(&evnt->mtime); *f_len -= ret; return (TRUE); } int evnt_sc_read_send(struct Evnt *evnt, int fd, uintmax_t *len) { Vstr_base *out = evnt->io_w; size_t orig_len = out->len; size_t tmp = 0; int ret = IO_OK; ASSERT(len && *len); if ((ret = io_get(out, fd)) == IO_FAIL) return (EVNT_IO_READ_ERR); if (ret == IO_EOF) return (EVNT_IO_READ_EOF); tmp = out->len - orig_len; if (tmp >= *len) { /* we might not be transfering to EOF, so reduce if needed */ vstr_sc_reduce(out, 1, out->len, tmp - *len); ASSERT((out->len - orig_len) == *len); *len = 0; return (EVNT_IO_READ_FIN); } *len -= tmp; if (!evnt_send(evnt)) return (EVNT_IO_SEND_ERR); return (EVNT_IO_OK); } static int evnt__get_timeout(void) { const struct timeval *tv = NULL; int msecs = -1; if (q_send_now) msecs = 0; else if ((tv = timer_q_first_timeval())) { long diff = 0; struct timeval now_timeval; EVNT__COPY_TV(&now_timeval); diff = timer_q_timeval_diff_msecs(tv, &now_timeval); if (diff > 0) { if (diff >= INT_MAX) msecs = INT_MAX - 1; else msecs = diff; } else msecs = 0; } vlg_dbg2(vlg, "get_timeout = %'d\n", msecs); return (msecs); } void evnt_scan_q_close(void) { struct Evnt *scan = NULL; while ((scan = q_closed)) { scan->flag_q_closed = FALSE; q_closed = scan->c_next; evnt_free(scan); } ASSERT(!q_closed); } /* if something goes wrong drop all accept'ing events */ void evnt_acpt_close_all(void) { struct Evnt *evnt = q_accept; /* struct Evnt *evnt = evnt_queue("accept"); */ while (evnt) { evnt_close(evnt); evnt = evnt->next; } } void evnt_scan_fds(unsigned int ready, size_t max_sz) { const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL); struct Evnt *scan = NULL; EVNT__UPDATE_TV(); scan = q_connect; while (scan && ready) { struct Evnt *scan_next = NULL; int done = FALSE; int revents = 0; ASSERT(evnt__valid(scan)); if (!scan->flag_q_connect) break; revents = SOCKET_POLL_INDICATOR(scan->ind)->revents; SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (scan->flag_q_closed) { done = !!revents; scan_next = scan->next; goto next_connect; } assert(!(revents & POLLIN)); /* done as one so we get error code */ if (revents & (POLLOUT|bad_poll_flags)) { int ern = 0; socklen_t len = sizeof(int); int ret = 0; done = TRUE; ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len); if (ret == -1) vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n"); else if (ern) { errno = ern; vlg_warn(vlg, "connect(): %m\n"); scan_next = scan->next; evnt__close_now(scan); } else { evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE; evnt_add(&q_none, scan); scan->flag_q_none = TRUE; evnt_wait_cntl_del(scan, POLLOUT); if (!scan->cbs->cb_func_connect(scan)) { scan_next = scan->next; evnt__close_now(scan); } } goto next_connect; } ASSERT(!done); if (evnt_poll_direct_enabled()) break; scan_next = scan->next; next_connect: if (done) --ready; scan = scan_next; } scan = q_accept; while (scan && ready) { /* Papers have suggested that preferring read over accept is better * -- edge triggering needs to requeue on non failure */ struct Evnt *scan_next = NULL; int done = FALSE; int revents = 0; ASSERT(evnt__valid(scan)); if (!scan->flag_q_accept) break; revents = SOCKET_POLL_INDICATOR(scan->ind)->revents; SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (scan->flag_q_closed) { done = !!revents; scan_next = scan->next; goto next_accept; } assert(!(revents & POLLOUT)); if (!done && (revents & bad_poll_flags)) { /* done first as it's an error with the accept fd, whereas accept * generates new fds */ done = TRUE; scan_next = scan->next; evnt__close_now(scan); goto next_accept; } if (revents & POLLIN) { struct sockaddr_storage sstore[1]; struct sockaddr *sa = (struct sockaddr *) sstore; socklen_t len = sizeof(struct sockaddr_storage); int fd = -1; struct Evnt *tmp = NULL; unsigned int acpt_num = 0; done = TRUE; /* ignore all accept() errors -- bad_poll_flags fixes here */ /* FIXME: apache seems to assume certain errors are really bad and we * should just kill the listen socket and wait to die. But for instance. * we can't just kill the socket on EMFILE, as we might have hit our * resource limit */ while ((revents & POLLIN) && (fd = accept(evnt_fd(scan), sa, &len)) != -1) { if (!(tmp = scan->cbs->cb_func_accept(scan, fd, sa, len))) { close(fd); scan_next = scan->next; goto next_accept; } if (!acpt_num) /* for the first one, update */ EVNT__COPY_TV(&scan->mtime); if (!tmp->flag_q_closed) { ++ready; /* give a read event to this new event */ tmp->flag_fully_acpt = TRUE; } assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN); assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN); assert(tmp == q_recv); if (++acpt_num >= evnt__accept_limit) break; } scan_next = scan->next; goto next_accept; } ASSERT(!done); if (evnt_poll_direct_enabled()) break; scan_next = scan->next; next_accept: if (done) --ready; scan = scan_next; } evnt__scan_ready_moved_send_zero_fds = 0; scan = q_recv; while (scan && ready) { struct Evnt *scan_next = NULL; int done = FALSE; int revents = 0; ASSERT(evnt__valid(scan)); if (!scan->flag_q_recv) break; revents = SOCKET_POLL_INDICATOR(scan->ind)->revents; SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (scan->flag_q_closed) { done = !!revents; scan_next = scan->next; goto next_recv; } if (revents & POLLIN) { done = TRUE; if (!scan->cbs->cb_func_recv(scan)) { scan_next = scan->next; evnt__close_now(scan); goto next_recv; } } if (!done && (revents & bad_poll_flags)) { done = TRUE; if (scan->io_r_shutdown || scan->io_w_shutdown || !scan->cbs->cb_func_shutdown_r(scan)) { scan_next = scan->next; evnt__close_now(scan); goto next_recv; } } scan_next = scan->next; next_recv: if (!done && evnt_poll_direct_enabled()) break; if (done) --ready; scan = scan_next; } scan = q_send_recv; while (scan && ready) { struct Evnt *scan_next = NULL; int done = FALSE; int revents = 0; ASSERT(evnt__valid(scan)); if (!scan->flag_q_send_recv) break; revents = SOCKET_POLL_INDICATOR(scan->ind)->revents; SOCKET_POLL_INDICATOR(scan->ind)->revents = 0; if (scan->flag_q_closed) { done = !!revents; scan_next = scan->next; goto next_send; } if (revents & POLLIN) { done = TRUE; if (!scan->cbs->cb_func_recv(scan)) { scan_next = scan->next; evnt__close_now(scan); goto next_send; } } if (revents & POLLOUT) { done = TRUE; /* need groups so we can do direct send here */ if (!evnt_send_add(scan, TRUE, max_sz)) { scan_next = scan->next; evnt__close_now(scan); goto next_send; } } if (!done && (revents & bad_poll_flags)) { done = TRUE; if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan)) { scan_next = scan->next; evnt__close_now(scan); goto next_send; } } scan_next = scan->next; next_send: if (!done && evnt__scan_ready_moved_send_zero_fds) --evnt__scan_ready_moved_send_zero_fds; else { if (!done && evnt_poll_direct_enabled()) break; if (done) --ready; } scan = scan_next; } scan = q_none; while (scan && ready) { struct Evnt *scan_next = scan->next; int done = FALSE; ASSERT(evnt__valid(scan)); if (!scan->flag_q_none) break; if (scan->flag_q_closed) goto next_none; if (SOCKET_POLL_INDICATOR(scan->ind)->revents) { /* POLLIN == EOF ? */ /* FIXME: failure cb */ done = TRUE; evnt__close_now(scan); goto next_none; } ASSERT(!done); if (evnt_poll_direct_enabled()) break; next_none: if (done) --ready; scan = scan_next; } if (q_closed) evnt_scan_q_close(); else if (ready) /* FIXME: needs a different approach */ vlg_warn(vlg, "ready = %d\n", ready); } void evnt_scan_send_fds(void) { struct Evnt **scan = NULL; evnt_scan_q_close(); scan = &q_send_now; while (*scan) { struct Evnt *tmp = *scan; tmp->flag_q_send_now = FALSE; *scan = tmp->s_next; if (!tmp->cbs->cb_func_send(tmp)) { evnt__close_now(tmp); continue; } if (tmp == *scan) /* added back to q */ { ASSERT(tmp->flag_q_send_now == TRUE); scan = &tmp->s_next; } } evnt_scan_q_close(); } static void evnt__close_1(struct Evnt **root) { struct Evnt *scan = *root; *root = NULL; while (scan) { struct Evnt *scan_next = scan->next; Vstr_ref *acpt_sa = scan->acpt_sa_ref; Vstr_ref *sa = scan->sa_ref; Timer_q_node *tm_o = scan->tm_o; Timer_q_node *tm_l_r = scan->tm_l_r; Timer_q_node *tm_l_w = scan->tm_l_w; vstr_free_base(scan->io_w); scan->io_w = NULL; vstr_free_base(scan->io_r); scan->io_r = NULL; evnt_poll_del(scan); --evnt__num; evnt__free2(sa, acpt_sa, tm_o, tm_l_r, tm_l_w); scan->cbs->cb_func_free(scan); scan = scan_next; } } void evnt_close_all(void) { q_send_now = NULL; q_closed = NULL; evnt__close_1(&q_connect); evnt__close_1(&q_accept); evnt__close_1(&q_recv); evnt__close_1(&q_send_recv); evnt__close_1(&q_none); ASSERT(evnt__num == evnt__debug_num_all()); } void evnt_out_dbg3(const char *prefix) { if (vlg->out_dbg < 3) return; vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n", prefix, evnt_num_all(), evnt__debug_num_1(q_connect), evnt__debug_num_1(q_accept), evnt__debug_num_1(q_recv), evnt__debug_num_1(q_send_recv), evnt__debug_num_1(q_none), evnt__debug_num_1(q_send_now)); } void evnt_stats_add(struct Evnt *dst, const struct Evnt *src) { EVNT__COPY_TV(&dst->mtime); dst->acct.req_put += src->acct.req_put; dst->acct.req_got += src->acct.req_got; dst->acct.bytes_r += src->acct.bytes_r; dst->acct.bytes_w += src->acct.bytes_w; } unsigned int evnt_num_all(void) { ASSERT(evnt__num == evnt__debug_num_all()); return (evnt__num); } int evnt_waiting(void) { return (q_connect || q_accept || q_recv || q_send_recv || q_send_now); } struct Evnt *evnt_find_least_used(void) { struct Evnt *con = NULL; struct Evnt *con_min = NULL; /* Find a usable connection, tries to find the least used connection * preferring ones not blocking on send IO */ if (!(con = q_none) && !(con = q_recv) && !(con = q_send_recv)) return (NULL); /* FIXME: not optimal, only want to change after a certain level */ con_min = con; while (con) { if (con_min->io_w->len > con->io_w->len) con_min = con; con = con->next; } return (con_min); } #define MATCH_Q_NAME(x) \ if (CSTREQ(qname, #x )) \ return ( q_ ## x ) \ struct Evnt *evnt_queue(const char *qname) { MATCH_Q_NAME(connect); MATCH_Q_NAME(accept); MATCH_Q_NAME(send_recv); MATCH_Q_NAME(recv); MATCH_Q_NAME(none); MATCH_Q_NAME(send_now); return (NULL); } void evnt_fd_set_nagle(struct Evnt *evnt, int val) { ASSERT(evnt__valid(evnt)); val = !!val; if (evnt->flag_io_nagle == val) return; if (!evnt_fd__set_nodelay(evnt_fd(evnt), !val)) return; evnt->flag_io_nagle = val; } void evnt_fd_set_cork(struct Evnt *evnt, int val) { /* assume it can't work for set and fail for unset */ ASSERT(evnt__valid(evnt)); if (!USE_TCP_CORK) return; val = !!val; if (evnt->flag_io_cork == val) return; if (val) /* flags can't be combined ... stupid */ evnt_fd_set_nagle(evnt, TRUE); if (!evnt_fd__set_cork(evnt_fd(evnt), val)) return; evnt->flag_io_cork = val; } static void evnt__free_base_noerrno(Vstr_base *s1) { int saved_errno = errno; vstr_free_base(s1); errno = saved_errno; } int evnt_fd_set_filter(struct Evnt *evnt, const char *fname) { int fd = evnt_fd(evnt); Vstr_base *s1 = NULL; unsigned int ern = 0; if (!CONF_USE_SOCKET_FILTERS) return (TRUE); if (!(s1 = vstr_make_base(NULL))) VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n")); vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern); if (ern && !(((ern == VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) && (errno == ENOENT)) || ((ern == VSTR_TYPE_SC_READ_FILE_ERR_FSTAT_ERRNO) && (errno == ENOSPC)))) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname)); } else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX) { vstr_free_base(s1); errno = E2BIG; VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname)); } else if (!s1->len) { vstr_free_base(s1); if (!evnt->flag_io_filter) vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname); else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n", fname)); } evnt->flag_io_filter = FALSE; return (TRUE); } else { struct sock_fprog filter[1]; socklen_t len = sizeof(filter); filter->len = s1->len / sizeof(struct sock_filter); filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len); if (!filter->filter) VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n")); if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1) { evnt__free_base_noerrno(s1); VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n", fname)); } } evnt->flag_io_filter = TRUE; vstr_free_base(s1); return (TRUE); } static Timer_q_base *evnt__timeout_1 = NULL; static Timer_q_base *evnt__timeout_10 = NULL; static Timer_q_base *evnt__timeout_100 = NULL; static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt, struct timeval *tv, unsigned long msecs) { Timer_q_node *tm_o = NULL; vlg_dbg2(vlg, "mtime_make($, %'lu)\n", EVNT_SA(evnt), msecs); if (0) { } else if (msecs >= ( 99 * 1000)) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0); tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else if (msecs >= ( 9 * 1000)) { TIMER_Q_TIMEVAL_ADD_SECS(tv, 10, 0); tm_o = timer_q_add_node(evnt__timeout_10, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } else { TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0); tm_o = timer_q_add_node(evnt__timeout_1, evnt, tv, TIMER_Q_FLAG_NODE_DEFAULT); } return (tm_o); } static void evnt__timer_cb_mtime(int type, void *data) { struct Evnt *evnt = data; struct timeval tv[1]; unsigned long diff = 0; if (!evnt) /* deleted */ return; ASSERT(evnt__valid(evnt)); if (type == TIMER_Q_TYPE_CALL_RUN_ALL) return; evnt->tm_o = NULL; if (type == TIMER_Q_TYPE_CALL_DEL) return; EVNT__COPY_TV(tv); /* find out time elapsed */ diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime); if (diff < evnt->msecs_tm_mtime) diff = evnt->msecs_tm_mtime - diff; /* seconds left until timeout */ else { vlg_dbg2(vlg, "timeout from[$] = (%'lu, %'lu)\n", EVNT_SA(evnt), diff, evnt->msecs_tm_mtime); if (evnt->flag_insta_close || !evnt_shutdown_w(evnt)) { evnt_close(evnt); return; } /* FIXME: linger close time configurable? */ EVNT__COPY_TV(&evnt->mtime); evnt->msecs_tm_mtime /= 2; diff = evnt->msecs_tm_mtime; } if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff))) { errno = ENOMEM; vlg_warn(vlg, "%s: %m\n", "timer reinit"); evnt_close(evnt); } } void evnt_timeout_init(void) { int flags = TIMER_Q_FLAG_BASE_DEFAULT; ASSERT(!evnt__timeout_1); EVNT__UPDATE_TV(); evnt__timeout_1 = timer_q_add_base(evnt__timer_cb_mtime, flags); evnt__timeout_10 = timer_q_add_base(evnt__timer_cb_mtime, flags); evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime, flags); if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init")); /* we always allocate limit timers, just easier that way ... */ evnt__timeout_lim_r_1 = timer_q_add_base(evnt__timer_cb_lim_r, flags); evnt__timeout_lim_r_10 = timer_q_add_base(evnt__timer_cb_lim_r, flags); evnt__timeout_lim_w_1 = timer_q_add_base(evnt__timer_cb_lim_w, flags); evnt__timeout_lim_w_10 = timer_q_add_base(evnt__timer_cb_lim_w, flags); if (!evnt__timeout_lim_r_1 || !evnt__timeout_lim_r_10 || !evnt__timeout_lim_w_1 || !evnt__timeout_lim_w_10) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init")); } void evnt_timeout_exit(void) { ASSERT(evnt__timeout_1); timer_q_del_base(evnt__timeout_1); evnt__timeout_1 = NULL; timer_q_del_base(evnt__timeout_10); evnt__timeout_10 = NULL; timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL; timer_q_del_base(evnt__timeout_lim_r_1); evnt__timeout_lim_r_1 = NULL; timer_q_del_base(evnt__timeout_lim_r_10); evnt__timeout_lim_r_10 = NULL; timer_q_del_base(evnt__timeout_lim_w_1); evnt__timeout_lim_w_1 = NULL; timer_q_del_base(evnt__timeout_lim_w_10); evnt__timeout_lim_w_10 = NULL; } int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs) { struct timeval tv[1]; if (!(evnt->msecs_tm_mtime = msecs)) return (TRUE); EVNT__COPY_TV(tv); if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs))) return (FALSE); return (TRUE); } static void evnt__timed_updates(const struct timeval *tv) { static time_t last_sec = 0; if (!last_sec || (difftime(tv->tv_sec, last_sec) > (5 * 60))) { /* do updates about once every 5 minutes (don't use timer_q as we don't * want to be woken for this). */ struct stat buf[1]; last_sec = tv->tv_sec; /* Maybe have Libc stat/update /etc/localtime data. * Note that if we are in a chroot() without one don't let * GLibc fuck it up */ if (!stat ("/etc/localtime", buf)) tzset(); } } void evnt_sc_main_loop(size_t max_sz) { int ready = 0; struct timeval tv[1]; EVNT__UPDATE_TV(); ready = evnt_poll(); if ((ready == -1) && (errno != EINTR)) vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll"); if (ready == -1) return; evnt_out_dbg3("1"); evnt_scan_fds(ready, max_sz); evnt_out_dbg3("2"); evnt_scan_send_fds(); evnt_out_dbg3("3"); EVNT__UPDATE_TV(); EVNT__COPY_TV(tv); timer_q_run_norm(tv); evnt__timed_updates(tv); evnt_out_dbg3("4"); evnt_scan_send_fds(); evnt_out_dbg3("5"); } time_t evnt_sc_time(void) { time_t ret = -1; if (!CONF_GETTIMEOFDAY_TIME) ret = time(NULL); else { struct timeval tv[1]; EVNT__COPY_TV(tv); ret = tv->tv_sec; } return (ret); } void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt) { struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt; struct Acpt_data *acpt_data = acpt_listener->ref->ptr; evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE"); acpt_data->evnt = NULL; vstr_ref_del(acpt_listener->ref); vstr_ref_del(acpt_listener->def_policy); F(acpt_listener); } static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref) { struct Acpt_data *ptr = NULL; if (!ref) return; ptr = ref->ptr; vstr_ref_del(ptr->sa); F(ptr); free(ref); } struct Evnt *evnt_sc_serv_make_bind_ipv4(const char *acpt_addr, unsigned short acpt_port, unsigned int q_listen_len, unsigned int max_connections, unsigned int defer_accept, const char *acpt_filter_file, const char *acpt_cong) { struct sockaddr_in *sinv4 = NULL; Acpt_listener *acpt_listener = NULL; Acpt_data *acpt_data = NULL; Vstr_ref *ref = NULL; if (!(acpt_listener = MK(sizeof(Acpt_listener)))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_listener->max_connections = max_connections; acpt_listener->def_policy = NULL; if (!(acpt_data = MK(sizeof(Acpt_data)))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_data->evnt = NULL; acpt_data->sa = NULL; if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb))) VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n", acpt_addr, acpt_port)); acpt_listener->ref = ref; if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port, q_listen_len, acpt_cong)) vlg_err(vlg, 2, "make_bind(%s, %hd, %d, %s): Failed!\n", acpt_addr ? acpt_addr : "any", acpt_port, q_listen_len, acpt_cong); acpt_data->evnt = acpt_listener->evnt; acpt_data->sa = vstr_ref_add(acpt_data->evnt->sa_ref); sinv4 = EVNT_SA_IN4(acpt_listener->evnt); ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port))); if (defer_accept) evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept); if (acpt_filter_file && !evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file)) vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file); acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free; return (acpt_listener->evnt); } void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix) { vlg_info(vlg, "%s from[$] req_got[%'u:%u] req_put[%'u:%u]" " recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n", prefix, EVNT_SA(evnt), evnt->acct.req_got, evnt->acct.req_got, evnt->acct.req_put, evnt->acct.req_put, evnt->acct.bytes_r, evnt->acct.bytes_r, evnt->acct.bytes_w, evnt->acct.bytes_w); } #ifdef TCP_DEFER_ACCEPT # define USE_TCP_DEFER_ACCEPT 1 #else # define USE_TCP_DEFER_ACCEPT 0 # define TCP_DEFER_ACCEPT 0 #endif void evnt_fd_set_defer_accept(struct Evnt *evnt, int val) { socklen_t len = sizeof(int); ASSERT(evnt__valid(evnt)); if (!USE_TCP_DEFER_ACCEPT) return; /* ignore return val */ setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len); } pid_t evnt_make_child(void) { pid_t ret = fork(); if (!ret) { evnt__is_child = TRUE; evnt_poll_child_init(); } return (ret); } int evnt_is_child(void) { return (evnt__is_child); } int evnt_child_block_beg(void) { if (!evnt_is_child()) return (TRUE); if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1) vlg_err(vlg, EXIT_FAILURE, "prctl(%s, %s): %m\n", "PR_SET_PDEATHSIG", "SIGTERM"); if (evnt_child_exited) return (FALSE); return (TRUE); } int evnt_child_block_end(void) { if (!evnt_is_child()) return (TRUE); if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1) vlg_err(vlg, EXIT_FAILURE, "prctl(%s, %s): %m\n", "PR_SET_PDEATHSIG", "SIGCHLD"); return (TRUE); } /* if we are blocking forever, and the only thing we are waiting for is * a single accept() fd, just call accept() */ static int evnt__poll_tst_accept(int msecs) { return ((msecs == -1) && (evnt_num_all() == 1) && q_accept); } static int evnt__poll_accept(void) { int fd = -1; struct Evnt *scan = q_accept; struct sockaddr_in sa; socklen_t len = sizeof(struct sockaddr_in); struct Evnt *tmp = NULL; /* need to make sure we die if the parent does */ evnt_fd__set_nonblock(evnt_fd(scan), FALSE); if (!evnt_child_block_beg()) goto block_beg_fail; fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len); evnt_child_block_end(); evnt_fd__set_nonblock(evnt_fd(scan), TRUE); if (fd == -1) goto accept_fail; if (!(tmp = scan->cbs->cb_func_accept(scan, fd, (struct sockaddr *) &sa, len))) goto cb_accept_fail; if (!tmp->flag_q_closed) tmp->flag_fully_acpt = TRUE; assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN); assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN); assert(tmp == q_recv); return (1); block_beg_fail: evnt_fd__set_nonblock(evnt_fd(scan), TRUE); goto accept_fail; cb_accept_fail: close(fd); accept_fail: errno = EINTR; return (-1); } #ifndef EVNT_USE_EPOLL # ifdef HAVE_SYS_EPOLL_H # define EVNT_USE_EPOLL 1 # else # define EVNT_USE_EPOLL 0 # endif #endif #if !EVNT_USE_EPOLL int evnt_poll_init(void) { return (TRUE); } int evnt_poll_direct_enabled(void) { return (FALSE); } int evnt_poll_child_init(void) { return (TRUE); } void evnt_wait_cntl_add(struct Evnt *evnt, int flags) { /* FIXME: do the POLLOUT revents differently ... move accept into the cb */ SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN); } void evnt_wait_cntl_del(struct Evnt *evnt, int flags) { SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags; } unsigned int evnt_poll_add(struct Evnt *COMPILE_ATTR_UNUSED(evnt), int fd) { return (socket_poll_add(fd)); } void evnt_poll_del(struct Evnt *evnt) { if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1) close(SOCKET_POLL_INDICATOR(evnt->ind)->fd); socket_poll_del(evnt->ind); } /* NOTE: that because of socket_poll direct mapping etc. we can't be "clever" */ int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd) { unsigned int old_ind = evnt->ind; assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd); ASSERT(evnt__valid(evnt)); if (!(evnt->ind = socket_poll_add(fd))) goto poll_add_fail; if (!(evnt->io_r = vstr_make_base(NULL)) || !(evnt->io_w = vstr_make_base(NULL))) goto malloc_base_fail; SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN; socket_poll_del(old_ind); evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_fd__set_nonblock(fd, TRUE); ASSERT(evnt__valid(evnt)); return (TRUE); malloc_base_fail: socket_poll_del(evnt->ind); evnt->ind = old_ind; vstr_free_base(evnt->io_r); evnt->io_r = NULL; ASSERT(!evnt->io_r && !evnt->io_w); poll_add_fail: return (FALSE); } int evnt_poll(void) { int msecs = evnt__get_timeout(); if (evnt_child_exited) return (errno = EINTR, -1); if (evnt__poll_tst_accept(msecs)) return (evnt__poll_accept()); return (socket_poll_update_all(msecs)); } #else #include static int evnt__epoll_fd = -1; int evnt_poll_init(void) { assert(POLLIN == EPOLLIN); assert(POLLOUT == EPOLLOUT); assert(POLLHUP == EPOLLHUP); assert(POLLERR == EPOLLERR); if (!CONF_EVNT_NO_EPOLL) { evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd); } return (evnt__epoll_fd != -1); } int evnt_poll_direct_enabled(void) { return (evnt__epoll_fd != -1); } static int evnt__epoll_readd(struct Evnt *evnt) { struct epoll_event epevent[1]; while (evnt) { int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_readd($,%u=%s)\n", EVNT_SA(evnt), flags, EVNT__POLL_FLGS(flags)); epevent->events = flags; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n"); evnt = evnt->next; } return (TRUE); } int evnt_poll_child_init(void) { /* Can't share epoll() fd's between tasks ... */ if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled()) { close(evnt__epoll_fd); evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); /* size does nothing */ if (evnt__epoll_fd == -1) VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n")); evnt__epoll_readd(q_connect); evnt__epoll_readd(q_accept); evnt__epoll_readd(q_recv); evnt__epoll_readd(q_send_recv); evnt__epoll_readd(q_none); } return (TRUE); } void evnt_wait_cntl_add(struct Evnt *evnt, int flags) { if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) return; /* FIXME: do the POLLOUT revents differently ... move accept into the cb */ SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN); if (evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_mod_add($,%u=%s)\n", EVNT_SA(evnt), flags, EVNT__POLL_FLGS(flags)); epevent->events = flags; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n"); } } void evnt_wait_cntl_del(struct Evnt *evnt, int flags) { if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags)) return; SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags; SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags; if (flags && evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; flags = SOCKET_POLL_INDICATOR(evnt->ind)->events; vlg_dbg2(vlg, "epoll_mod_del($,%u=%s)\n", EVNT_SA(evnt), flags, EVNT__POLL_FLGS(flags)); epevent->events = flags; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1) vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n"); } } unsigned int evnt_poll_add(struct Evnt *evnt, int fd) { unsigned int ind = socket_poll_add(fd); if (ind && evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; int flags = 0; vlg_dbg2(vlg, "epoll_add($,%u=%s)\n", EVNT_SA(evnt), flags, EVNT__POLL_FLGS(flags)); epevent->events = flags; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1) { vlg_warn(vlg, "epoll: %m\n"); socket_poll_del(fd); } } return (ind); } void evnt_poll_del(struct Evnt *evnt) { if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1) close(SOCKET_POLL_INDICATOR(evnt->ind)->fd); socket_poll_del(evnt->ind); /* done via. the close() */ if (FALSE && evnt_poll_direct_enabled()) { int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd; struct epoll_event epevent[1]; vlg_dbg2(vlg, "epoll_del($)\n", EVNT_SA(evnt)); epevent->events = 0; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); } } int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd) { unsigned int old_ind = evnt->ind; int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd; assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd); ASSERT(evnt__valid(evnt)); if (!(evnt->ind = socket_poll_add(fd))) goto poll_add_fail; if (!(evnt->io_r = vstr_make_base(NULL)) || !(evnt->io_w = vstr_make_base(NULL))) goto malloc_base_fail; SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN; SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN; socket_poll_del(old_ind); evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE; evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE; evnt_fd__set_nonblock(fd, TRUE); if (evnt_poll_direct_enabled()) { struct epoll_event epevent[1]; vlg_dbg2(vlg, "epoll_swap($,%d,%d)\n", EVNT_SA(evnt), old_fd, fd); epevent->events = POLLIN; epevent->data.u64 = 0; /* FIXME: keep valgrind happy */ epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); epevent->events = SOCKET_POLL_INDICATOR(evnt->ind)->events; epevent->data.ptr = evnt; if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1) vlg_abort(vlg, "epoll: %m\n"); } ASSERT(evnt__valid(evnt)); return (TRUE); malloc_base_fail: socket_poll_del(evnt->ind); evnt->ind = old_ind; vstr_free_base(evnt->io_r); evnt->io_r = NULL; ASSERT(!evnt->io_r && !evnt->io_w); poll_add_fail: return (FALSE); } #define EVNT__EPOLL_EVENTS 128 int evnt_poll(void) { struct epoll_event events[EVNT__EPOLL_EVENTS]; int msecs = evnt__get_timeout(); int ret = 0; unsigned int scan = 0; if (evnt_child_exited) return (errno = EINTR, -1); if (evnt__poll_tst_accept(msecs)) return (evnt__poll_accept()); if (!evnt_poll_direct_enabled()) return (socket_poll_update_all(msecs)); if (msecs) { /* do the double poll() that socket_poll_update_all() does for us */ ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, 0); if (ret == -1) return (ret); } if (!ret) ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs); if (ret == -1) return (ret); scan = ret; ASSERT(scan <= EVNT__EPOLL_EVENTS); while (scan-- > 0) { struct Evnt *evnt = NULL; unsigned int flags = 0; flags = events[scan].events; evnt = events[scan].data.ptr; ASSERT(evnt__valid(evnt)); vlg_dbg2(vlg, "epoll_wait($,%u=%s)\n", EVNT_SA(evnt), flags, EVNT__POLL_FLGS(flags)); vlg_dbg2(vlg, "epoll[flags]=a=%u|r=%u|s=%u\n", evnt->flag_q_accept, evnt->flag_q_recv, evnt->flag_q_send_recv); assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) || ((POLLHUP|POLLERR) & flags)); SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags; evnt__del_whatever(evnt); /* move to front of queue */ evnt__add_whatever(evnt); } return (ret); } #endif