1 : /*
2 : * Copyright (C) 2002, 2003, 2004, 2005, 2006 James Antill
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation; either
7 : * version 2 of the License, or (at your option) any later version.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : * email: james@and.org
19 : */
20 : /* IO events, and some help with timed events */
21 : #include <vstr.h>
22 :
23 : #include <stdlib.h>
24 : #include <sys/types.h>
25 : #include <sys/socket.h>
26 : #include <unistd.h>
27 : #include <fcntl.h>
28 : #include <errno.h>
29 : #include <getopt.h>
30 : #include <string.h>
31 : #include <netinet/in.h>
32 : #include <sys/un.h>
33 : #include <netinet/tcp.h>
34 : #include <arpa/inet.h>
35 : #include <sys/poll.h>
36 : #include <netdb.h>
37 : #include <sys/time.h>
38 : #include <time.h>
39 : #include <signal.h>
40 : #include <sys/stat.h>
41 :
42 : #include <socket_poll.h>
43 : #include <timer_q.h>
44 :
45 : #include <sys/sendfile.h>
46 :
47 : #ifndef TCP_CONGESTION
48 : # ifdef __linux__
49 : # define TCP_CONGESTION 13
50 : # else
51 : # define TCP_CONGESTION 0
52 : # endif
53 : #endif
54 :
55 : /* FIXME: Should do better autoconf checks... */
56 : #if defined(__linux__)
57 : /* Linux doesn't let TCP_NODELAY be config., like Solaris ... and maybe *BSD?
58 : * and doesn't inherit socket flags from accept() like *BSD */
59 : # define HAVE_TCP_NODELAY_CONFIG FALSE
60 : #else
61 : # define HAVE_TCP_NODELAY_CONFIG TRUE
62 : #endif
63 :
64 : #if defined(__BSD__)
65 : /* Linux doesn't inherit socket flags from accept() like *BSD ...
66 : * and maybe Solaris? */
67 : # define HAVE_SOCK_FLAGS_INHERIT TRUE
68 : #else
69 : # define HAVE_SOCK_FLAGS_INHERIT FALSE
70 : #endif
71 :
72 : #define EVNT__POLL_FLGS(x) \
73 : ((((x) & (POLLIN | POLLOUT)) == (POLLIN | POLLOUT)) ? "(POLLIN | POLLOUT)" : \
74 : (((x) & POLLIN) ? "(POLLIN)" : \
75 : (((x) & POLLOUT) ? "(POLLOUT)" : \
76 : "()")))
77 :
78 : #define CONF_EVNT_NO_EPOLL FALSE
79 : #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */
80 : #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */
81 : #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */
82 :
83 : #ifdef CONF_FULL_STATIC
84 : # define EVNT__RESOLVE_NAME(saddr, x) do { \
85 : if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
86 : saddr->sin_addr.s_addr = htonl(INADDR_ANY); \
87 : } while (FALSE)
88 : #else
89 : # include <netdb.h>
90 : # define EVNT__RESOLVE_NAME(saddr, x) do { \
91 : if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
92 : { \
93 : struct hostent *h = gethostbyname(x); \
94 : \
95 : saddr->sin_addr.s_addr = htonl(INADDR_ANY); \
96 : if (h) \
97 : memcpy(&saddr->sin_addr.s_addr, \
98 : h->h_addr_list[0], \
99 : sizeof(saddr->sin_addr.s_addr)); \
100 : } \
101 : } while (FALSE)
102 : #endif
103 :
104 : #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER)
105 : # define CONF_USE_SOCKET_FILTERS FALSE
106 : struct sock_fprog { int dummy; };
107 : # define SO_DETACH_FILTER 0
108 : # define SO_ATTACH_FILTER 0
109 : #else
110 : # define CONF_USE_SOCKET_FILTERS TRUE
111 :
112 : /* not in glibc... hope it's not in *BSD etc. */
113 : struct sock_filter
114 : {
115 : uint16_t code; /* Actual filter code */
116 : uint8_t jt; /* Jump true */
117 : uint8_t jf; /* Jump false */
118 : uint32_t k; /* Generic multiuse field */
119 : };
120 :
121 : struct sock_fprog
122 : {
123 : unsigned short len; /* Number of filter blocks */
124 : struct sock_filter *filter;
125 : };
126 : #endif
127 :
128 : #include "vlg.h"
129 :
130 : #define EX_UTILS_NO_USE_INIT 1
131 : #define EX_UTILS_NO_USE_EXIT 1
132 : #define EX_UTILS_NO_USE_LIMIT 1
133 : #define EX_UTILS_NO_USE_BLOCK 1
134 : #define EX_UTILS_NO_USE_PUT 1
135 : #define EX_UTILS_NO_USE_OPEN 1
136 : #define EX_UTILS_NO_USE_IO_FD 1
137 : #include "ex_utils.h"
138 :
139 : #ifdef HAVE_TCP_CORK
140 : # define USE_TCP_CORK TRUE
141 : #else
142 : # define USE_TCP_CORK FALSE
143 : # define TCP_CORK 0
144 : #endif
145 :
146 : #include "evnt.h"
147 :
148 : #include "mk.h"
149 :
150 : volatile sig_atomic_t evnt_child_exited = FALSE;
151 :
152 : int evnt_opt_nagle = EVNT_CONF_NAGLE;
153 :
154 : static struct Evnt *q_send_now = NULL; /* Try a send "now" */
155 : static struct Evnt *q_closed = NULL; /* Close when fin. */
156 :
157 : static struct Evnt *q_none = NULL; /* nothing */
158 : static struct Evnt *q_accept = NULL; /* connections - recv */
159 : static struct Evnt *q_connect = NULL; /* connections - send */
160 : static struct Evnt *q_recv = NULL; /* recv */
161 : static struct Evnt *q_send_recv = NULL; /* recv + send */
162 :
163 : static Vlg *vlg = NULL;
164 :
165 : static unsigned int evnt__num = 0;
166 :
167 : /* things can move from recv -> send_recv with a timeout,
168 : * so they look like the end of events NULL */
169 : static unsigned int evnt__scan_ready_moved_send_zero_fds = 0;
170 :
171 : /* this should be more configurable... */
172 : static unsigned int evnt__accept_limit = 4;
173 :
174 : static struct timeval evnt__tv[1];
175 :
176 : static int evnt__is_child = FALSE;
177 :
178 : #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL)
179 : #define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval))
180 :
181 : void evnt_logger(Vlg *passed_vlg)
182 296 : {
183 296 : vlg = passed_vlg;
184 296 : }
185 :
186 : void evnt_fd__set_nonblock(int fd, int val)
187 12916 : {
188 12916 : int flags = 0;
189 :
190 6458 : ASSERT(val == !!val);
191 :
192 12916 : if ((flags = fcntl(fd, F_GETFL)) == -1)
193 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
194 :
195 12916 : if (!!(flags & O_NONBLOCK) == val)
196 0 : return;
197 :
198 12916 : if (val)
199 12916 : flags |= O_NONBLOCK;
200 : else
201 0 : flags &= ~O_NONBLOCK;
202 :
203 12916 : if (fcntl(fd, F_SETFL, flags) == -1)
204 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
205 : }
206 :
207 : static void evnt_fd__set_coe(int fd, int val)
208 12680 : {
209 6340 : ASSERT(val == !!val);
210 :
211 12680 : if (fcntl(fd, F_SETFD, val) == -1)
212 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
213 12680 : }
214 :
215 : static int evnt_fd__set_nodelay(int fd, int val)
216 12302 : {
217 12302 : return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1);
218 : }
219 :
220 : static int evnt_fd__set_cork(int fd, int val)
221 33322 : {
222 33322 : return (setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) != -1);
223 : }
224 :
225 : static int evnt_fd__set_reuse(int fd, int val)
226 120 : {
227 120 : return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1);
228 : }
229 :
230 : #define CLEN_SZ(x) (strlen(x) + 1)
231 : static int evnt_fd__set_congestion(int fd, const char *val)
232 0 : {
233 : if (!TCP_CONGESTION)
234 : return (errno = ENOPROTOOPT, FALSE);
235 :
236 0 : return (setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, val, CLEN_SZ(val)) != -1);
237 : }
238 :
239 : void evnt_add(struct Evnt **que, struct Evnt *node)
240 63704 : {
241 30466 : assert(node != *que);
242 :
243 63704 : if ((node->next = *que))
244 38632 : node->next->prev = node;
245 :
246 63704 : node->prev = NULL;
247 63704 : *que = node;
248 63704 : }
249 :
250 : void evnt_del(struct Evnt **que, struct Evnt *node)
251 63482 : {
252 63482 : if (node->prev)
253 8140 : node->prev->next = node->next;
254 : else
255 : {
256 24424 : assert(*que == node);
257 55342 : *que = node->next;
258 : }
259 :
260 63482 : if (node->next)
261 35024 : node->next->prev = node->prev;
262 63482 : }
263 :
264 : static void evnt__del_whatever(struct Evnt *evnt)
265 62154 : {
266 : if (0) { }
267 62154 : else if (evnt->flag_q_accept)
268 10913 : evnt_del(&q_accept, evnt);
269 51241 : else if (evnt->flag_q_connect)
270 0 : evnt_del(&q_connect, evnt);
271 51241 : else if (evnt->flag_q_recv)
272 50840 : evnt_del(&q_recv, evnt);
273 401 : else if (evnt->flag_q_send_recv)
274 401 : evnt_del(&q_send_recv, evnt);
275 0 : else if (evnt->flag_q_none)
276 0 : evnt_del(&q_none, evnt);
277 : else
278 0 : ASSERT_NOT_REACHED();
279 62154 : }
280 :
281 : static void COMPILE_ATTR_USED() evnt__add_whatever(struct Evnt *evnt)
282 49696 : {
283 : if (0) { }
284 49696 : else if (evnt->flag_q_accept)
285 10747 : evnt_add(&q_accept, evnt);
286 38949 : else if (evnt->flag_q_connect)
287 0 : evnt_add(&q_connect, evnt);
288 38949 : else if (evnt->flag_q_recv)
289 38548 : evnt_add(&q_recv, evnt);
290 401 : else if (evnt->flag_q_send_recv)
291 401 : evnt_add(&q_send_recv, evnt);
292 0 : else if (evnt->flag_q_none)
293 0 : evnt_add(&q_none, evnt);
294 : else
295 0 : ASSERT_NOT_REACHED();
296 49696 : }
297 :
298 : static unsigned int evnt__debug_num_1(struct Evnt *scan)
299 85741467 : {
300 85741467 : unsigned int num = 0;
301 :
302 281254700 : while (scan)
303 : {
304 109771766 : struct Evnt *scan_next = scan->next;
305 :
306 109771766 : ++num;
307 :
308 109771766 : scan = scan_next;
309 : }
310 :
311 85741467 : return (num);
312 : }
313 :
314 : #if COMPILE_DEBUG
315 : static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt)
316 17140448 : {
317 17140448 : struct Evnt **ret = que;
318 :
319 38142777 : while (*ret)
320 : {
321 21002329 : if (*ret == evnt)
322 17140448 : return (ret);
323 :
324 3861881 : ret = &(*ret)->next;
325 : }
326 :
327 0 : return (NULL);
328 : }
329 :
330 : static int evnt__valid(struct Evnt *evnt)
331 17140448 : {
332 17140448 : int ret = 0;
333 :
334 17140448 : ASSERT(evnt_num_all());
335 :
336 17140448 : ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
337 : evnt->flag_q_send_recv + evnt->flag_q_none) == 1);
338 :
339 17140448 : if (evnt->flag_q_send_now)
340 : {
341 3706444 : struct Evnt **scan = &q_send_now;
342 :
343 7438591 : while (*scan && (*scan != evnt))
344 25703 : scan = &(*scan)->s_next;
345 3706444 : ASSERT(*scan);
346 : }
347 : else
348 : {
349 13434004 : struct Evnt **scan = &q_send_now;
350 :
351 27170863 : while (*scan && (*scan != evnt))
352 302855 : scan = &(*scan)->s_next;
353 13434004 : ASSERT(!*scan);
354 : }
355 :
356 17140448 : if (evnt->flag_q_closed)
357 : {
358 101 : struct Evnt **scan = &q_closed;
359 :
360 202 : while (*scan && (*scan != evnt))
361 0 : scan = &(*scan)->c_next;
362 101 : ASSERT(*scan);
363 : }
364 : else
365 : {
366 17140347 : struct Evnt **scan = &q_closed;
367 :
368 34283536 : while (*scan && (*scan != evnt))
369 2842 : scan = &(*scan)->c_next;
370 17140347 : ASSERT(!*scan);
371 : }
372 :
373 : if (0) { }
374 17140448 : else if (evnt->flag_q_accept)
375 : {
376 40530 : ret = !!evnt__srch(&q_accept, evnt);
377 40530 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
378 40530 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
379 40530 : ASSERT(!evnt->io_r && !evnt->io_w);
380 : }
381 17099918 : else if (evnt->flag_q_connect)
382 : {
383 0 : ret = !!evnt__srch(&q_connect, evnt);
384 0 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
385 0 : assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) ||
386 : evnt->tm_l_w);
387 0 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
388 : }
389 17099918 : else if (evnt->flag_q_send_recv)
390 : {
391 9823997 : ret = !!evnt__srch(&q_send_recv, evnt);
392 9823997 : assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) ||
393 : evnt->tm_l_w);
394 : }
395 7275921 : else if (evnt->flag_q_recv)
396 : {
397 7275113 : ret = !!evnt__srch(&q_recv, evnt);
398 7275113 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
399 7275113 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
400 : }
401 808 : else if (evnt->flag_q_none)
402 : {
403 808 : ret = !!evnt__srch(&q_none, evnt);
404 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
405 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
406 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
407 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
408 : }
409 : else
410 0 : ASSERT_NOT_REACHED();
411 :
412 17140448 : return (ret);
413 : }
414 :
415 : static unsigned int evnt__debug_num_all(void)
416 17147415 : {
417 17147415 : unsigned int num = 0;
418 :
419 17147415 : num += evnt__debug_num_1(q_connect);
420 17147415 : num += evnt__debug_num_1(q_accept);
421 17147415 : num += evnt__debug_num_1(q_recv);
422 17147415 : num += evnt__debug_num_1(q_send_recv);
423 17147415 : num += evnt__debug_num_1(q_none);
424 :
425 17147415 : return (num);
426 : }
427 : #endif
428 :
429 : int evnt_fd(struct Evnt *evnt)
430 18011716 : {
431 3806409 : ASSERT(evnt__valid(evnt));
432 18011716 : return (SOCKET_POLL_INDICATOR(evnt->ind)->fd);
433 : }
434 :
435 : int evnt_cb_func_connect(struct Evnt *COMPILE_ATTR_UNUSED(evnt))
436 0 : {
437 0 : return (TRUE);
438 : }
439 :
440 : struct Evnt *evnt_cb_func_accept(struct Evnt *COMPILE_ATTR_UNUSED(evnt),
441 : int COMPILE_ATTR_UNUSED(fd),
442 : struct sockaddr *COMPILE_ATTR_UNUSED(sa),
443 : socklen_t COMPILE_ATTR_UNUSED(len))
444 0 : {
445 0 : return (NULL);
446 : }
447 :
448 : int evnt_cb_func_recv(struct Evnt *evnt)
449 1371 : {
450 1371 : unsigned int ern = 0;
451 1371 : int ret = evnt_recv(evnt, &ern);
452 :
453 1371 : if (ret)
454 1139 : return (TRUE);
455 :
456 232 : if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len)
457 0 : return (evnt_shutdown_r(evnt, TRUE));
458 :
459 232 : return (FALSE);
460 : }
461 :
462 : int evnt_cb_func_send(struct Evnt *evnt)
463 18 : {
464 18 : int ret = -1;
465 :
466 18 : evnt_fd_set_cork(evnt, TRUE);
467 18 : ret = evnt_send(evnt);
468 18 : if (!evnt->io_w->len)
469 18 : evnt_fd_set_cork(evnt, FALSE);
470 :
471 18 : return (ret);
472 : }
473 :
474 : void evnt_cb_func_free(struct Evnt *evnt)
475 0 : {
476 0 : MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt));
477 0 : free(evnt);
478 0 : }
479 :
480 : void evnt_cb_func_F(struct Evnt *evnt)
481 0 : {
482 0 : F(evnt);
483 0 : }
484 :
485 : int evnt_cb_func_shutdown_r(struct Evnt *evnt)
486 0 : {
487 0 : vlg_dbg2(vlg, "SHUTDOWN CB from[$<sa:%p>]\n", EVNT_SA(evnt));
488 :
489 0 : if (!evnt_shutdown_r(evnt, FALSE))
490 0 : return (FALSE);
491 :
492 : /* called from outside read, and read'll never get called again ...
493 : * so quit if we have nothing to send */
494 0 : return (!!evnt->io_w->len);
495 : }
496 :
497 : static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref,
498 : struct Evnt *from_evnt)
499 12680 : {
500 6340 : ASSERT(ref);
501 :
502 12680 : evnt->flag_q_accept = FALSE;
503 12680 : evnt->flag_q_connect = FALSE;
504 12680 : evnt->flag_q_recv = FALSE;
505 12680 : evnt->flag_q_send_recv = FALSE;
506 12680 : evnt->flag_q_none = FALSE;
507 :
508 12680 : evnt->flag_q_send_now = FALSE;
509 12680 : evnt->flag_q_closed = FALSE;
510 :
511 12680 : evnt->flag_q_pkt_move = FALSE;
512 :
513 12680 : evnt->flag_io_nagle = FALSE;
514 12680 : evnt->flag_io_cork = FALSE;
515 :
516 12680 : evnt->flag_io_filter = FALSE;
517 :
518 12680 : evnt->flag_fully_acpt = FALSE;
519 :
520 12680 : evnt->flag_insta_close = FALSE;
521 :
522 12680 : evnt->io_r_shutdown = FALSE;
523 12680 : evnt->io_w_shutdown = FALSE;
524 :
525 12680 : evnt->io_r_limited = FALSE;
526 12680 : evnt->io_w_limited = FALSE;
527 :
528 12680 : evnt->prev_bytes_r = 0;
529 :
530 12680 : evnt->acct.req_put = 0;
531 12680 : evnt->acct.req_got = 0;
532 12680 : evnt->acct.bytes_r = 0;
533 12680 : evnt->acct.bytes_w = 0;
534 :
535 12680 : evnt->cbs->cb_func_accept = evnt_cb_func_accept;
536 12680 : evnt->cbs->cb_func_connect = evnt_cb_func_connect;
537 12680 : evnt->cbs->cb_func_recv = evnt_cb_func_recv;
538 12680 : evnt->cbs->cb_func_send = evnt_cb_func_send;
539 12680 : evnt->cbs->cb_func_free = evnt_cb_func_F;
540 12680 : evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r;
541 :
542 12680 : if (!(evnt->io_r = vstr_make_base(NULL)))
543 0 : goto make_vstr_fail;
544 :
545 12680 : if (!(evnt->io_w = vstr_make_base(NULL)))
546 0 : goto make_vstr_fail;
547 :
548 12680 : evnt->tm_o = NULL;
549 :
550 12680 : evnt->tm_l_r = NULL;
551 12680 : evnt->tm_l_w = NULL;
552 12680 : evnt->lims = NULL;
553 12680 : evnt->lim_num = 0;
554 :
555 12680 : EVNT__COPY_TV(&evnt->ctime);
556 12680 : EVNT__COPY_TV(&evnt->mtime);
557 :
558 12680 : evnt->msecs_tm_mtime = 0;
559 :
560 12680 : evnt_fd__set_coe(fd, TRUE);
561 :
562 12680 : evnt->sa_ref = vstr_ref_add(ref);
563 12680 : evnt->acpt_sa_ref = NULL;
564 :
565 12680 : if (!(evnt->ind = evnt_poll_add(evnt, fd)))
566 0 : goto poll_add_fail;
567 :
568 : /* FIXME: need group settings */
569 : if (HAVE_SOCK_FLAGS_INHERIT && from_evnt &&
570 : (from_evnt->flag_io_nagle == evnt_opt_nagle))
571 : evnt->flag_io_nagle = evnt_opt_nagle;
572 12680 : else if (HAVE_TCP_NODELAY_CONFIG || !evnt_opt_nagle)
573 : {
574 232 : evnt_fd__set_nodelay(fd, !evnt->flag_io_nagle);
575 232 : evnt->flag_io_nagle = evnt_opt_nagle;
576 : }
577 :
578 : if (!HAVE_SOCK_FLAGS_INHERIT || !from_evnt)
579 12680 : evnt_fd__set_nonblock(fd, TRUE);
580 :
581 12680 : return (TRUE);
582 :
583 0 : poll_add_fail:
584 0 : vstr_ref_del(evnt->sa_ref); evnt->sa_ref = NULL;
585 0 : vstr_free_base(evnt->io_w);
586 0 : make_vstr_fail:
587 0 : vstr_free_base(evnt->io_r);
588 :
589 0 : errno = ENOMEM;
590 0 : return (FALSE);
591 : }
592 :
593 : static void evnt__free1(struct Evnt *evnt)
594 12458 : {
595 12458 : evnt_send_del(evnt);
596 :
597 12458 : if (evnt->io_r && evnt->io_r->len)
598 32 : vlg_dbg2(vlg, "evnt__free1($<sa:%p>) io_r len = %zu\n",
599 : EVNT_SA(evnt), evnt->io_r->len);
600 12458 : if (evnt->io_w && evnt->io_w->len)
601 0 : vlg_dbg2(vlg, "evnt__free1($<sa:%p>) io_w len = %zu\n",
602 : EVNT_SA(evnt), evnt->io_w->len);
603 :
604 12458 : vstr_free_base(evnt->io_w); evnt->io_w = NULL;
605 12458 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
606 :
607 12458 : evnt_poll_del(evnt);
608 12458 : evnt_limit_free(evnt);
609 12458 : }
610 :
611 : static void evnt__free_tq(Timer_q_node *tm)
612 38070 : {
613 38070 : if (tm)
614 : {
615 12052 : timer_q_cntl_node(tm, TIMER_Q_CNTL_NODE_SET_DATA, NULL);
616 12052 : timer_q_quick_del_node(tm);
617 : }
618 38070 : }
619 :
620 : static void evnt__free2(Vstr_ref *sa, Vstr_ref *acpt_sa,
621 : Timer_q_node *tm_o,
622 : Timer_q_node *tm_l_r, Timer_q_node *tm_l_w)
623 12690 : { /* post callbacks, evnt no longer exists */
624 12690 : vstr_ref_del(sa);
625 12690 : vstr_ref_del(acpt_sa);
626 :
627 12690 : evnt__free_tq(tm_o);
628 12690 : evnt__free_tq(tm_l_r);
629 12690 : evnt__free_tq(tm_l_w);
630 12690 : }
631 :
632 : static void evnt__free(struct Evnt *evnt)
633 12458 : {
634 12458 : if (evnt)
635 : {
636 12458 : Vstr_ref *sa = evnt->sa_ref;
637 12458 : Vstr_ref *acpt_sa = evnt->acpt_sa_ref;
638 12458 : Timer_q_node *tm_o = evnt->tm_o;
639 12458 : Timer_q_node *tm_l_r = evnt->tm_l_r;
640 12458 : Timer_q_node *tm_l_w = evnt->tm_l_w;
641 :
642 12458 : evnt__free1(evnt);
643 :
644 6229 : ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */
645 12458 : --evnt__num;
646 6229 : ASSERT(evnt__num == evnt__debug_num_all());
647 :
648 12458 : evnt->cbs->cb_func_free(evnt);
649 12458 : evnt__free2(sa, acpt_sa, tm_o, tm_l_r, tm_l_w);
650 : }
651 12458 : }
652 :
653 : void evnt_free(struct Evnt *evnt)
654 12458 : {
655 12458 : if (evnt)
656 : {
657 12458 : evnt__del_whatever(evnt);
658 12458 : evnt__free(evnt);
659 : }
660 12458 : }
661 :
662 : static void evnt__close_now(struct Evnt *evnt)
663 12288 : {
664 12288 : if (evnt->flag_q_closed)
665 32 : return;
666 :
667 12256 : evnt_free(evnt);
668 : }
669 :
670 : static void evnt__uninit(struct Evnt *evnt)
671 0 : {
672 0 : ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
673 : evnt->flag_q_send_recv + evnt->flag_q_none) == 0);
674 :
675 0 : evnt__free1(evnt);
676 0 : evnt__free2(evnt->sa_ref, evnt->acpt_sa_ref,
677 : evnt->tm_o, evnt->tm_l_r, evnt->tm_l_w);
678 0 : }
679 :
680 : static void evnt__fd_close_noerrno(int fd)
681 0 : {
682 0 : int saved_errno = errno;
683 0 : close(fd);
684 0 : errno = saved_errno;
685 0 : }
686 :
687 : static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags)
688 12680 : {
689 12680 : evnt_add(que, evnt);
690 :
691 12680 : ++evnt__num;
692 :
693 12680 : evnt_wait_cntl_add(evnt, flags);
694 :
695 6340 : ASSERT(evnt__valid(evnt));
696 :
697 12680 : return (TRUE);
698 : }
699 :
700 : int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port)
701 0 : {
702 0 : int fd = -1;
703 0 : socklen_t alloc_len = sizeof(struct sockaddr_in);
704 0 : Vstr_ref *ref = NULL;
705 0 : struct sockaddr_in *saddr = NULL;
706 :
707 0 : if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
708 0 : goto sock_fail;
709 :
710 0 : EVNT__UPDATE_TV();
711 :
712 0 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
713 0 : goto init_fail;
714 0 : saddr = ref->ptr;
715 :
716 0 : saddr->sin_family = AF_INET;
717 0 : saddr->sin_port = htons(port);
718 0 : saddr->sin_addr.s_addr = inet_addr(ipv4_string);
719 :
720 0 : if (!evnt_init(evnt, fd, ref, NULL))
721 0 : goto init_fail;
722 0 : evnt->flag_q_pkt_move = TRUE;
723 :
724 0 : ASSERT(port && (saddr->sin_addr.s_addr != htonl(INADDR_ANY)));
725 :
726 0 : if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
727 : {
728 0 : if (errno == EINPROGRESS)
729 : { /* The connection needs more time....*/
730 0 : vstr_ref_del(ref);
731 0 : evnt->flag_q_connect = TRUE;
732 0 : return (evnt__make_end(&q_connect, evnt, POLLOUT));
733 : }
734 :
735 0 : goto connect_fail;
736 : }
737 :
738 0 : vstr_ref_del(ref);
739 0 : evnt->flag_q_none = TRUE;
740 0 : return (evnt__make_end(&q_none, evnt, 0));
741 :
742 0 : connect_fail:
743 0 : evnt__uninit(evnt);
744 0 : init_fail:
745 0 : vstr_ref_del(ref);
746 0 : evnt__fd_close_noerrno(fd);
747 0 : sock_fail:
748 0 : return (FALSE);
749 : }
750 :
751 : int evnt_make_con_local(struct Evnt *evnt, const char *fname)
752 232 : {
753 232 : int fd = -1;
754 232 : size_t len = strlen(fname) + 1;
755 : struct sockaddr_un tmp_sun;
756 232 : struct sockaddr_un *saddr = NULL;
757 232 : socklen_t alloc_len = 0;
758 232 : Vstr_ref *ref = NULL;
759 :
760 232 : tmp_sun.sun_path[0] = 0;
761 232 : alloc_len = SUN_LEN(&tmp_sun) + len;
762 :
763 232 : if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
764 0 : goto sock_fail;
765 :
766 232 : EVNT__UPDATE_TV();
767 :
768 232 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
769 0 : goto init_fail;
770 232 : saddr = ref->ptr;
771 :
772 232 : saddr->sun_family = AF_LOCAL;
773 232 : memcpy(saddr->sun_path, fname, len);
774 :
775 232 : if (!evnt_init(evnt, fd, ref, NULL))
776 0 : goto init_fail;
777 232 : evnt->flag_q_pkt_move = TRUE;
778 :
779 232 : if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
780 : {
781 0 : if (errno == EINPROGRESS)
782 : { /* The connection needs more time....*/
783 0 : vstr_ref_del(ref);
784 0 : evnt->flag_q_connect = TRUE;
785 0 : return (evnt__make_end(&q_connect, evnt, POLLOUT));
786 : }
787 :
788 0 : goto connect_fail;
789 : }
790 :
791 232 : vstr_ref_del(ref);
792 232 : evnt->flag_q_none = TRUE;
793 232 : return (evnt__make_end(&q_none, evnt, 0));
794 :
795 0 : connect_fail:
796 0 : evnt__uninit(evnt);
797 0 : init_fail:
798 0 : vstr_ref_del(ref);
799 0 : evnt__fd_close_noerrno(fd);
800 0 : sock_fail:
801 0 : return (FALSE);
802 : }
803 :
804 : int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa)
805 12284 : {
806 12284 : if (!evnt_init(evnt, fd, sa, NULL))
807 0 : return (FALSE);
808 :
809 12284 : evnt->flag_q_recv = TRUE;
810 12284 : return (evnt__make_end(&q_recv, evnt, POLLIN));
811 : }
812 :
813 : int evnt_make_acpt_dup(struct Evnt *evnt, int fd,
814 : struct sockaddr *sa, socklen_t len)
815 12052 : {
816 12052 : Vstr_ref *ref = vstr_ref_make_memdup(sa, len);
817 12052 : int ret = FALSE;
818 :
819 12052 : if (!ref)
820 : {
821 0 : errno = ENOMEM;
822 0 : return (FALSE);
823 : }
824 :
825 12052 : ret = evnt_make_acpt_ref(evnt, fd, ref);
826 12052 : vstr_ref_del(ref);
827 12052 : return (ret);
828 : }
829 :
830 : static int evnt__make_bind_end(struct Evnt *evnt)
831 160 : {
832 160 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
833 160 : vstr_free_base(evnt->io_w); evnt->io_w = NULL;
834 :
835 160 : evnt->flag_q_accept = TRUE;
836 160 : evnt__make_end(&q_accept, evnt, POLLIN);
837 160 : SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0;
838 160 : return (TRUE);
839 : }
840 :
841 : int evnt_make_bind_ipv4(struct Evnt *evnt,
842 : const char *acpt_addr, short server_port,
843 : unsigned int listen_len, const char *cong)
844 120 : {
845 120 : int fd = -1;
846 120 : socklen_t alloc_len = sizeof(struct sockaddr_in);
847 120 : Vstr_ref *ref = NULL;
848 120 : struct sockaddr_in *saddr = NULL;
849 :
850 120 : if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
851 0 : VLG_WARN_GOTO(sock_fail, (vlg, "socket(): %m\n"));
852 :
853 120 : EVNT__UPDATE_TV();
854 :
855 120 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
856 0 : VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__));
857 120 : saddr = ref->ptr;
858 :
859 120 : saddr->sin_family = AF_INET;
860 :
861 120 : saddr->sin_addr.s_addr = htonl(INADDR_ANY);
862 120 : if (acpt_addr && *acpt_addr) /* silent error becomes <any> */
863 82 : EVNT__RESOLVE_NAME(saddr, acpt_addr);
864 120 : if (saddr->sin_addr.s_addr == htonl(INADDR_ANY))
865 38 : acpt_addr = "any";
866 :
867 120 : saddr->sin_port = htons(server_port);
868 :
869 120 : if (!evnt_init(evnt, fd, ref, NULL))
870 0 : VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__));
871 :
872 120 : if (!evnt_fd__set_reuse(fd, TRUE))
873 0 : VLG_WARNNOMEM_GOTO(reuse_fail, (vlg, "%s(): %m\n", __func__));
874 :
875 120 : if (cong && !evnt_fd__set_congestion(fd, cong) && (errno != ENOPROTOOPT))
876 0 : VLG_WARN_GOTO(cong_fail,
877 : (vlg, "setsockopt(TCP_CONGESTION, %s): %m\n", cong));
878 :
879 120 : if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
880 0 : VLG_WARN_GOTO(bind_fail,
881 : (vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port));
882 :
883 120 : if (!server_port)
884 120 : if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1)
885 0 : VLG_WARN_GOTO(getsockname_fail, (vlg, "getsockname(): %m\n"));
886 :
887 120 : if (listen(fd, listen_len) == -1)
888 0 : VLG_WARN_GOTO(listen_fail, (vlg, "listen(%d): %m\n", listen_len));
889 :
890 120 : vstr_ref_del(ref);
891 120 : return (evnt__make_bind_end(evnt));
892 :
893 0 : listen_fail:
894 0 : getsockname_fail:
895 0 : bind_fail:
896 0 : cong_fail:
897 0 : reuse_fail:
898 0 : evnt__uninit(evnt);
899 0 : init_fail:
900 0 : vstr_ref_del(ref);
901 0 : evnt__fd_close_noerrno(fd);
902 0 : sock_fail:
903 0 : return (FALSE);
904 : }
905 :
906 : int evnt_make_bind_local(struct Evnt *evnt, const char *fname,
907 : unsigned int listen_len)
908 40 : {
909 40 : int fd = -1;
910 40 : int saved_errno = 0;
911 40 : size_t len = strlen(fname) + 1;
912 : struct sockaddr_un tmp_sun;
913 40 : struct sockaddr_un *saddr = NULL;
914 40 : socklen_t alloc_len = 0;
915 40 : Vstr_ref *ref = NULL;
916 :
917 40 : tmp_sun.sun_path[0] = 0;
918 40 : alloc_len = SUN_LEN(&tmp_sun) + len;
919 :
920 40 : if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
921 0 : goto sock_fail;
922 :
923 40 : EVNT__UPDATE_TV();
924 :
925 40 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
926 0 : goto init_fail;
927 40 : saddr = ref->ptr;
928 :
929 40 : saddr->sun_family = AF_LOCAL;
930 40 : memcpy(saddr->sun_path, fname, len);
931 :
932 40 : if (!evnt_init(evnt, fd, ref, NULL))
933 0 : goto init_fail;
934 :
935 40 : if (unlink(fname) != -1)
936 0 : vlg_warn(vlg, "HAD to unlink(%s) for bind\n", fname);
937 :
938 40 : if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
939 0 : goto bind_fail;
940 :
941 40 : if (fchmod(fd, 0600) == -1)
942 0 : goto fchmod_fail;
943 :
944 40 : if (listen(fd, listen_len) == -1)
945 0 : goto listen_fail;
946 :
947 40 : vstr_ref_del(ref);
948 40 : return (evnt__make_bind_end(evnt));
949 :
950 0 : bind_fail:
951 0 : saved_errno = errno;
952 0 : vlg_warn(vlg, "bind(%s): %m\n", fname);
953 0 : errno = saved_errno;
954 0 : fchmod_fail:
955 0 : listen_fail:
956 0 : evnt__uninit(evnt);
957 0 : init_fail:
958 0 : vstr_ref_del(ref);
959 0 : evnt__fd_close_noerrno(fd);
960 0 : sock_fail:
961 0 : return (FALSE);
962 : }
963 :
964 : int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags)
965 4 : {
966 : static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1};
967 :
968 4 : if (!sa)
969 4 : sa = &dummy_sa;
970 :
971 4 : EVNT__UPDATE_TV();
972 :
973 4 : if (!evnt_init(evnt, fd, sa, NULL))
974 : {
975 0 : evnt__fd_close_noerrno(fd);
976 0 : return (FALSE);
977 : }
978 :
979 4 : if (flags & POLLIN)
980 : {
981 4 : evnt->flag_q_recv = TRUE;
982 4 : return (evnt__make_end(&q_recv, evnt, POLLIN));
983 : }
984 :
985 0 : evnt->flag_q_none = TRUE;
986 0 : return (evnt__make_end(&q_none, evnt, 0));
987 : }
988 :
989 : void evnt_close(struct Evnt *evnt)
990 202 : {
991 202 : if (!evnt)
992 0 : return;
993 :
994 101 : ASSERT(evnt__valid(evnt));
995 :
996 202 : if (evnt->flag_q_closed)
997 0 : return;
998 :
999 : /* can't close at this point or we'll race with:
1000 : * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */
1001 :
1002 : /* queue for deletion ... as the bt might still be using the ptr */
1003 202 : evnt->flag_q_closed = TRUE;
1004 202 : evnt->c_next = q_closed;
1005 202 : q_closed = evnt;
1006 :
1007 101 : ASSERT(evnt__valid(evnt));
1008 : }
1009 :
1010 : int evnt_limit_add(struct Evnt *evnt, Vstr_ref *ref)
1011 48208 : {
1012 48208 : Vstr_ref **refs = evnt->lims;
1013 48208 : unsigned int lim_num = evnt->lim_num + 1;
1014 48208 : struct Evnt_limit *lim = ref->ptr;
1015 :
1016 24104 : ASSERT((!evnt->lims && (lim_num == 1)) ||
1017 : ( evnt->lims && (lim_num >= 2)));
1018 :
1019 48208 : evnt->io_r_limited |= !!lim->io_r_max;
1020 48208 : evnt->io_w_limited |= !!lim->io_w_max;
1021 :
1022 48208 : if (lim_num == 1)
1023 : {
1024 12052 : if (!(evnt->lims = MK(sizeof(Vstr_ref *))))
1025 0 : return (FALSE);
1026 : }
1027 36156 : else if (!MV(evnt->lims, refs, sizeof(Vstr_ref *) * lim_num))
1028 0 : return (FALSE);
1029 :
1030 48208 : evnt->lims[evnt->lim_num] = vstr_ref_add(ref);
1031 48208 : evnt->lim_num = lim_num;
1032 :
1033 48208 : return (TRUE);
1034 : }
1035 :
1036 : int evnt_limit_dup(struct Evnt *evnt, const struct Evnt_limit *lim)
1037 24104 : {
1038 24104 : Vstr_ref *ref = vstr_ref_make_memdup(lim, sizeof(struct Evnt_limit));
1039 24104 : int ret = FALSE;
1040 :
1041 24104 : if (!ref)
1042 0 : return (FALSE);
1043 :
1044 24104 : ret = evnt_limit_add(evnt, ref);
1045 24104 : vstr_ref_del(ref);
1046 :
1047 24104 : return (ret);
1048 : }
1049 :
1050 : void evnt_limit_chg(struct Evnt *evnt, unsigned int scan, Vstr_ref *ref)
1051 45862 : {
1052 45862 : Vstr_ref *tmp = NULL;
1053 :
1054 22009 : ASSERT(evnt->lim_num > scan);
1055 45862 : tmp = evnt->lims[scan];
1056 :
1057 45862 : if (ref)
1058 : {
1059 45862 : struct Evnt_limit *lim = ref->ptr;
1060 :
1061 45862 : evnt->io_r_limited |= !!lim->io_r_max;
1062 45862 : evnt->io_w_limited |= !!lim->io_w_max;
1063 :
1064 45862 : ref = vstr_ref_add(ref);
1065 : }
1066 :
1067 45862 : evnt->lims[scan] = ref;
1068 45862 : vstr_ref_del(tmp);
1069 45862 : }
1070 :
1071 : void evnt_limit_alt(struct Evnt *evnt, unsigned int scan,
1072 : const struct Evnt_limit *nlim)
1073 45862 : {
1074 45862 : Vstr_ref *ref = NULL;
1075 45862 : struct Evnt_limit *lim = NULL;
1076 :
1077 22009 : ASSERT(evnt->lim_num > scan);
1078 45862 : ref = evnt->lims[scan];
1079 22009 : ASSERT(ref);
1080 22009 : ASSERT(ref->ref == 1); /* should be the only owner of this data,
1081 : * if we are changing it */
1082 :
1083 45862 : lim = ref->ptr;
1084 :
1085 45862 : evnt->io_r_limited |= !!lim->io_r_max;
1086 45862 : evnt->io_w_limited |= !!lim->io_w_max;
1087 :
1088 45862 : lim->io_r_max = nlim->io_r_max;
1089 45862 : lim->io_w_max = nlim->io_w_max;
1090 :
1091 45862 : if (lim->io_r_cur > lim->io_r_max)
1092 0 : lim->io_r_cur = lim->io_r_max;
1093 :
1094 45862 : if (lim->io_w_cur > lim->io_w_max)
1095 140 : lim->io_w_cur = lim->io_w_max;
1096 45862 : }
1097 :
1098 : void evnt_limit_free(struct Evnt *evnt)
1099 12458 : {
1100 12458 : unsigned int scan = 0;
1101 :
1102 73124 : while (scan < evnt->lim_num)
1103 48208 : vstr_ref_del(evnt->lims[scan++]);
1104 6229 : ASSERT(scan == evnt->lim_num);
1105 :
1106 12458 : F(evnt->lims);
1107 12458 : evnt->lims = NULL;
1108 12458 : evnt->lim_num = 0;
1109 12458 : }
1110 :
1111 : static void evnt__limit_tm_chk(const struct timeval *tv, struct timeval *ltv,
1112 : unsigned long *cur, unsigned long max)
1113 10413084 : {
1114 : struct timeval tv_end[1];
1115 : struct timeval tv_reset[1];
1116 :
1117 10413084 : *tv_end = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_end, 1, 0);
1118 10413084 : *tv_reset = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_reset, 3, 0);
1119 :
1120 10413084 : if (!ltv->tv_sec)
1121 : {
1122 4332 : *cur = max;
1123 4332 : return;
1124 : }
1125 :
1126 : /* if it's been more than a second ... reset */
1127 10408752 : if (TIMER_Q_TIMEVAL_CMP(tv, tv_end) > 0)
1128 : {
1129 7818 : if (TIMER_Q_TIMEVAL_CMP(tv, tv_reset) < 0)
1130 7814 : ++ltv->tv_sec; /* try and be nice */
1131 : else
1132 : { /* if it's been more than 3 seconds, just restart */
1133 4 : ltv->tv_sec = 0;
1134 4 : ltv->tv_usec = 0;
1135 : }
1136 :
1137 7818 : *cur = max;
1138 : }
1139 : }
1140 :
1141 : static void evnt__limit_chk(struct Evnt *evnt)
1142 5202579 : {
1143 : struct timeval tv[1];
1144 5202579 : unsigned int scan = 0;
1145 :
1146 5202579 : EVNT__COPY_TV(tv);
1147 :
1148 5202579 : evnt->io_r_limited = FALSE; /* blank and reset */
1149 5202579 : evnt->io_w_limited = FALSE;
1150 :
1151 31215474 : while (scan < evnt->lim_num)
1152 : {
1153 20810316 : if (evnt->lims[scan])
1154 : {
1155 20810316 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1156 :
1157 20810316 : evnt->io_r_limited |= !!lim->io_r_max;
1158 20810316 : evnt->io_w_limited |= !!lim->io_w_max;
1159 :
1160 20810316 : if (evnt->io_r_limited)
1161 : {
1162 0 : vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n",
1163 : lim->io_r_cur, lim->io_r_max,
1164 : lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec);
1165 0 : evnt__limit_tm_chk(tv, &lim->io_r_tm, &lim->io_r_cur, lim->io_r_max);
1166 0 : vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n",
1167 : lim->io_r_cur, lim->io_r_max,
1168 : lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec);
1169 : }
1170 :
1171 20810316 : if (evnt->io_w_limited)
1172 : {
1173 10413084 : vlg_dbg2(vlg, "limit chk w beg (%'lu <= %'lu) @%lu:%lu\n",
1174 : lim->io_w_cur, lim->io_w_max,
1175 : lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec);
1176 10413084 : evnt__limit_tm_chk(tv, &lim->io_w_tm, &lim->io_w_cur, lim->io_w_max);
1177 10413084 : vlg_dbg2(vlg, "limit chk w end (%'lu <= %'lu) @%lu:%lu\n",
1178 : lim->io_w_cur, lim->io_w_max,
1179 : lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec);
1180 : }
1181 : }
1182 :
1183 20810316 : ++scan;
1184 : }
1185 386502 : ASSERT(scan == evnt->lim_num);
1186 5202579 : }
1187 :
1188 : static struct Evnt_limit *evnt_limit_r(struct Evnt *evnt, unsigned long sz)
1189 51859 : {
1190 51859 : struct Evnt_limit *ret = NULL;
1191 51859 : unsigned int scan = 0;
1192 :
1193 51859 : if (!evnt->io_r_limited)
1194 51859 : return (NULL);
1195 :
1196 0 : evnt__limit_chk(evnt);
1197 :
1198 0 : while (scan < evnt->lim_num)
1199 : {
1200 0 : if (evnt->lims[scan])
1201 : {
1202 0 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1203 :
1204 0 : if (lim->io_r_max && (!ret || (ret->io_r_cur > lim->io_r_cur)))
1205 0 : ret = lim;
1206 : }
1207 :
1208 0 : ++scan;
1209 : }
1210 0 : ASSERT(scan == evnt->lim_num);
1211 :
1212 0 : if (ret)
1213 0 : vlg_dbg2(vlg, "limit io_r(%'lu <= %'lu)\n", ret->io_r_cur, sz);
1214 :
1215 0 : if (ret && (ret->io_r_cur > sz))
1216 0 : ret = NULL;
1217 :
1218 0 : return (ret);
1219 : }
1220 :
1221 : static struct Evnt_limit *evnt_limit_w(struct Evnt *evnt, unsigned long sz)
1222 17824975 : {
1223 17824975 : struct Evnt_limit *ret = NULL;
1224 17824975 : unsigned int scan = 0;
1225 :
1226 17824975 : if (!evnt->io_w_limited)
1227 12622396 : return (NULL);
1228 :
1229 5202579 : evnt__limit_chk(evnt);
1230 :
1231 31215474 : while (scan < evnt->lim_num)
1232 : {
1233 20810316 : if (evnt->lims[scan])
1234 : {
1235 20810316 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1236 :
1237 20810316 : if (lim->io_w_max && (!ret || (ret->io_w_cur > lim->io_w_cur)))
1238 5205387 : ret = lim;
1239 : }
1240 :
1241 20810316 : ++scan;
1242 : }
1243 386502 : ASSERT(scan == evnt->lim_num);
1244 :
1245 5202579 : if (ret)
1246 5202579 : vlg_dbg2(vlg, "limit io_w(%'lu <= %'lu)\n", ret->io_w_cur, sz);
1247 :
1248 5202579 : if (ret && (ret->io_w_cur > sz))
1249 5198875 : ret = NULL;
1250 :
1251 5202579 : return (ret);
1252 : }
1253 :
1254 : static Timer_q_base *evnt__timeout_lim_r_1 = NULL; /* read/recv/accept */
1255 : static Timer_q_base *evnt__timeout_lim_r_10 = NULL;
1256 : static Timer_q_base *evnt__timeout_lim_w_1 = NULL; /* write/send/connect */
1257 : static Timer_q_base *evnt__timeout_lim_w_10 = NULL;
1258 :
1259 : static void evnt__timer_cb_lim_r(int type, void *data)
1260 0 : {
1261 0 : struct Evnt *evnt = data;
1262 :
1263 0 : if (!evnt) /* deleted */
1264 0 : return;
1265 :
1266 0 : ASSERT(evnt__valid(evnt));
1267 :
1268 0 : if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1269 0 : return;
1270 :
1271 0 : evnt->tm_l_r = NULL;
1272 0 : evnt_wait_cntl_add(evnt, POLLIN);
1273 :
1274 0 : if (type == TIMER_Q_TYPE_CALL_DEL)
1275 0 : return;
1276 :
1277 0 : if (evnt->flag_q_accept)
1278 : {
1279 0 : assert(FALSE);
1280 : }
1281 0 : else if (!evnt->cbs->cb_func_recv(evnt))
1282 0 : evnt__close_now(evnt);
1283 : }
1284 :
1285 : static int evnt_limit_timeout_r(struct Evnt *evnt, struct Evnt_limit *lim)
1286 51859 : {
1287 : struct timeval tv[1];
1288 51859 : unsigned long msecs = 0;
1289 :
1290 51859 : if (evnt->tm_l_r || !lim || lim->io_r_cur)
1291 51859 : return (TRUE);
1292 :
1293 0 : EVNT__COPY_TV(tv);
1294 0 : msecs = timer_q_timeval_udiff_msecs(tv, &lim->io_r_tm);
1295 :
1296 0 : if (msecs < 1000)
1297 0 : msecs = 1000 - msecs;
1298 : else
1299 0 : msecs = 0;
1300 0 : vlg_dbg2(vlg, "timeout_r_make($<sa:%p>, %lu)\n", EVNT_SA(evnt), msecs);
1301 :
1302 0 : if (msecs <= 100)
1303 : {
1304 0 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 0, 100 * 1000);
1305 0 : evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_1, evnt, tv,
1306 : TIMER_Q_FLAG_NODE_DEFAULT);
1307 : }
1308 : else
1309 : {
1310 0 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
1311 0 : evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_10, evnt, tv,
1312 : TIMER_Q_FLAG_NODE_DEFAULT);
1313 : }
1314 :
1315 0 : evnt_wait_cntl_del(evnt, POLLIN);
1316 :
1317 0 : return (!!evnt->tm_l_r);
1318 : }
1319 :
1320 : static void evnt__timer_cb_lim_w(int type, void *data)
1321 3406 : {
1322 3406 : struct Evnt *evnt = data;
1323 :
1324 3406 : if (!evnt) /* deleted */
1325 0 : return;
1326 :
1327 1808 : ASSERT(evnt__valid(evnt));
1328 :
1329 3406 : if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1330 0 : return;
1331 :
1332 3406 : evnt->tm_l_w = NULL;
1333 3406 : if (evnt->flag_q_send_recv)
1334 : {
1335 3001 : evnt_wait_cntl_add(evnt, POLLOUT);
1336 3001 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLOUT;
1337 : }
1338 :
1339 3406 : if (type == TIMER_Q_TYPE_CALL_DEL)
1340 0 : return;
1341 :
1342 3406 : if (evnt->flag_q_connect)
1343 : {
1344 0 : assert(FALSE);
1345 : }
1346 3406 : else if (!evnt->cbs->cb_func_send(evnt))
1347 17 : evnt__close_now(evnt);
1348 : }
1349 :
1350 : static int evnt_limit_timeout_w(struct Evnt *evnt, struct Evnt_limit *lim)
1351 11809014 : {
1352 : struct timeval tv[1];
1353 11809014 : unsigned long msecs = 0;
1354 :
1355 11809014 : if (evnt->tm_l_w || !lim || lim->io_w_cur)
1356 11805608 : return (TRUE);
1357 :
1358 3406 : EVNT__COPY_TV(tv);
1359 3406 : msecs = timer_q_timeval_udiff_msecs(tv, &lim->io_w_tm);
1360 :
1361 3406 : if (msecs < 1000)
1362 3403 : msecs = 1000 - msecs;
1363 : else
1364 3 : msecs = 0;
1365 3406 : vlg_dbg2(vlg, "timeout_w_make($<sa:%p>, %lu)\n", EVNT_SA(evnt), msecs);
1366 :
1367 3406 : if (msecs <= 100)
1368 : {
1369 17 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 0, 100 * 1000);
1370 17 : evnt->tm_l_w = timer_q_add_node(evnt__timeout_lim_w_1, evnt, tv,
1371 : TIMER_Q_FLAG_NODE_DEFAULT);
1372 : }
1373 : else
1374 : {
1375 3389 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
1376 3389 : evnt->tm_l_w = timer_q_add_node(evnt__timeout_lim_w_10, evnt, tv,
1377 : TIMER_Q_FLAG_NODE_DEFAULT);
1378 : }
1379 :
1380 3406 : evnt_wait_cntl_del(evnt, POLLOUT);
1381 :
1382 3406 : return (!!evnt->tm_l_w);
1383 : }
1384 :
1385 : /* takes limit values into account as well */
1386 : static void evnt__acct_r(struct Evnt *evnt, size_t bytes)
1387 51859 : {
1388 51859 : unsigned int scan = 0;
1389 :
1390 305670 : while (scan < evnt->lim_num)
1391 : {
1392 201952 : if (evnt->lims[scan])
1393 : {
1394 201952 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1395 :
1396 201952 : if (lim->io_r_max)
1397 : {
1398 0 : if (!lim->io_r_tm.tv_sec)
1399 : {
1400 0 : EVNT__COPY_TV(&lim->io_r_tm.tv_sec);
1401 0 : ASSERT(lim->io_r_cur == lim->io_r_max);
1402 : }
1403 :
1404 : /* must be at least this small, or we screwed up limiting */
1405 0 : ASSERT(lim->io_r_cur >= bytes);
1406 0 : lim->io_r_cur -= bytes;
1407 : }
1408 : }
1409 :
1410 201952 : ++scan;
1411 : }
1412 25144 : ASSERT(scan == evnt->lim_num);
1413 :
1414 51859 : evnt->acct.bytes_r += bytes;
1415 51859 : }
1416 :
1417 : static void evnt__acct_w(struct Evnt *evnt, size_t bytes)
1418 11809014 : {
1419 11809014 : unsigned int scan = 0;
1420 :
1421 70852012 : while (scan < evnt->lim_num)
1422 : {
1423 47233984 : if (evnt->lims[scan])
1424 : {
1425 47233984 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1426 :
1427 47233984 : if (lim->io_w_max)
1428 : {
1429 7394706 : if (!lim->io_w_tm.tv_sec)
1430 : {
1431 4336 : EVNT__COPY_TV(&lim->io_w_tm.tv_sec);
1432 2170 : ASSERT(lim->io_w_cur == lim->io_w_max);
1433 : }
1434 :
1435 : /* must be at least this small, or we screwed up limiting */
1436 775306 : ASSERT(lim->io_w_cur >= bytes);
1437 7394706 : lim->io_w_cur -= bytes;
1438 : }
1439 : }
1440 :
1441 47233984 : ++scan;
1442 : }
1443 1989630 : ASSERT(scan == evnt->lim_num);
1444 :
1445 11809014 : evnt->acct.bytes_w += bytes;
1446 11809014 : }
1447 :
1448 : void evnt_put_pkt(struct Evnt *evnt)
1449 40731 : {
1450 20373 : ASSERT(evnt__valid(evnt));
1451 :
1452 40731 : if (evnt->flag_q_pkt_move && evnt->flag_q_none)
1453 : {
1454 116 : ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
1455 232 : evnt_del(&q_none, evnt); evnt->flag_q_none = FALSE;
1456 232 : evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
1457 232 : evnt_wait_cntl_add(evnt, POLLIN);
1458 : }
1459 :
1460 40731 : ++evnt->acct.req_put;
1461 :
1462 20373 : ASSERT(evnt__valid(evnt));
1463 40731 : }
1464 :
1465 : void evnt_got_pkt(struct Evnt *evnt)
1466 40759 : {
1467 20387 : ASSERT(evnt__valid(evnt));
1468 :
1469 40759 : ++evnt->acct.req_got;
1470 :
1471 40759 : if (evnt->flag_q_pkt_move &&
1472 : !evnt->flag_q_send_recv && (evnt->acct.req_put == evnt->acct.req_got))
1473 : {
1474 114 : ASSERT(evnt->acct.req_put >= evnt->acct.req_got);
1475 228 : evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
1476 228 : evnt_add(&q_none, evnt), evnt->flag_q_none = TRUE;
1477 228 : evnt_wait_cntl_del(evnt, POLLIN);
1478 : }
1479 :
1480 20387 : ASSERT(evnt__valid(evnt));
1481 40759 : }
1482 :
1483 : static int evnt__call_send(struct Evnt *evnt, unsigned int *ern)
1484 11801056 : {
1485 11801056 : size_t tmp = evnt->io_w->len;
1486 11801056 : int fd = evnt_fd(evnt);
1487 11801056 : struct Evnt_limit *lim = NULL;
1488 :
1489 11801056 : if (!(lim = evnt_limit_w(evnt, evnt->io_w->len)))
1490 : {
1491 11797548 : if (!vstr_sc_write_fd(evnt->io_w, 1, tmp, fd, ern) && (errno != EAGAIN))
1492 0 : return (FALSE);
1493 : }
1494 3508 : else if (lim->io_w_cur)
1495 : {
1496 864 : if (!vstr_sc_write_fd(evnt->io_w, 1, lim->io_w_cur, fd, ern) &&
1497 : (errno != EAGAIN))
1498 0 : return (FALSE);
1499 : }
1500 :
1501 11801056 : tmp -= evnt->io_w->len;
1502 11801056 : vlg_dbg3(vlg, "write($<sa:%p>) = %zu\n", EVNT_SA(evnt), tmp);
1503 :
1504 11801056 : evnt__acct_w(evnt, tmp);
1505 11801056 : if (!evnt_limit_timeout_w(evnt, lim))
1506 : {
1507 0 : errno = ENOMEM, *ern = VSTR_TYPE_SC_WRITE_FD_ERR_MEM;
1508 0 : return (FALSE);
1509 : }
1510 :
1511 11801056 : return (TRUE);
1512 : }
1513 :
1514 : int evnt_send_add(struct Evnt *evnt, int force_q, size_t max_sz)
1515 17788061 : {
1516 3697221 : ASSERT(evnt__valid(evnt));
1517 :
1518 17788061 : vlg_dbg3(vlg, "q now = %u, q send recv = %u, force = %u\n",
1519 : evnt->flag_q_send_now, evnt->flag_q_send_recv, force_q);
1520 :
1521 17788061 : if (!evnt->flag_q_send_recv && (evnt->io_w->len > max_sz))
1522 : {
1523 12833 : if (!evnt__call_send(evnt, NULL))
1524 : {
1525 0 : ASSERT(evnt__valid(evnt));
1526 0 : return (FALSE);
1527 : }
1528 12833 : if (!evnt->io_w->len && !force_q)
1529 : {
1530 250 : ASSERT(evnt__valid(evnt));
1531 500 : return (TRUE);
1532 : }
1533 : }
1534 :
1535 : /* already on send_q -- or already polling (and not forcing) */
1536 17787561 : if (evnt->flag_q_send_now || evnt->tm_l_w ||
1537 : (evnt->flag_q_send_recv && !force_q))
1538 : {
1539 2150 : ASSERT(evnt__valid(evnt));
1540 4060 : return (TRUE);
1541 : }
1542 :
1543 17783501 : evnt->s_next = q_send_now;
1544 17783501 : q_send_now = evnt;
1545 17783501 : evnt->flag_q_send_now = TRUE;
1546 :
1547 3694821 : ASSERT(evnt__valid(evnt));
1548 :
1549 17783501 : return (TRUE);
1550 : }
1551 :
1552 : /* if a connection is on the send now q, then remove them ... this is only
1553 : * done when the client gets killed, so it doesn't matter if it's slow */
1554 : void evnt_send_del(struct Evnt *evnt)
1555 12458 : {
1556 12458 : struct Evnt **scan = &q_send_now;
1557 :
1558 12458 : if (!evnt->flag_q_send_now)
1559 12429 : return;
1560 :
1561 117 : while (*scan && (*scan != evnt))
1562 87 : scan = &(*scan)->s_next;
1563 :
1564 28 : ASSERT(*scan);
1565 :
1566 29 : *scan = evnt->s_next;
1567 :
1568 29 : evnt->flag_q_send_now = FALSE;
1569 : }
1570 :
1571 : int evnt_shutdown_r(struct Evnt *evnt, int got_eof)
1572 11942 : {
1573 5964 : ASSERT(evnt__valid(evnt));
1574 :
1575 11942 : if (evnt->io_r_shutdown || evnt->io_w_shutdown)
1576 11638 : return (FALSE);
1577 :
1578 304 : evnt_wait_cntl_del(evnt, POLLIN);
1579 :
1580 304 : vlg_dbg2(vlg, "shutdown(SHUT_RD, %d) from[$<sa:%p>]\n",
1581 : got_eof, EVNT_SA(evnt));
1582 :
1583 304 : if (!got_eof && (shutdown(evnt_fd(evnt), SHUT_RD) == -1))
1584 : {
1585 0 : if (errno != ENOTCONN)
1586 0 : vlg_warn(vlg, "shutdown(SHUT_RD): %m\n");
1587 0 : return (FALSE);
1588 : }
1589 :
1590 304 : evnt->io_r_shutdown = TRUE;
1591 :
1592 208 : ASSERT(evnt__valid(evnt));
1593 :
1594 304 : return (TRUE);
1595 : }
1596 :
1597 : static void evnt__send_fin(struct Evnt *evnt)
1598 11799943 : {
1599 : if (0)
1600 : { /* nothing */ }
1601 11800157 : else if ( evnt->flag_q_send_recv && !evnt->io_w->len)
1602 : {
1603 432 : evnt_del(&q_send_recv, evnt); evnt->flag_q_send_recv = FALSE;
1604 432 : if (evnt->flag_q_pkt_move && (evnt->acct.req_put == evnt->acct.req_got))
1605 : {
1606 0 : evnt_add(&q_none, evnt); evnt->flag_q_none = TRUE;
1607 0 : evnt_wait_cntl_del(evnt, POLLIN | POLLOUT);
1608 : }
1609 : else
1610 : {
1611 432 : evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
1612 432 : evnt_wait_cntl_del(evnt, POLLOUT);
1613 : }
1614 : }
1615 11799511 : else if (!evnt->flag_q_send_recv && evnt->io_w->len)
1616 : {
1617 432 : int pflags = evnt->tm_l_w ? 0 : POLLOUT;
1618 218 : ASSERT(evnt->flag_q_none || evnt->flag_q_recv);
1619 432 : if (evnt->flag_q_none)
1620 0 : evnt_del(&q_none, evnt), evnt->flag_q_none = FALSE;
1621 : else
1622 432 : evnt_del(&q_recv, evnt), evnt->flag_q_recv = FALSE;
1623 432 : evnt_add(&q_send_recv, evnt); evnt->flag_q_send_recv = TRUE;
1624 432 : if (!evnt->io_r_shutdown && !evnt->tm_l_r) pflags |= POLLIN;
1625 432 : if (!pflags)
1626 97 : ++evnt__scan_ready_moved_send_zero_fds;
1627 432 : evnt_wait_cntl_add(evnt, pflags);
1628 : }
1629 11799943 : }
1630 :
1631 : int evnt_shutdown_w(struct Evnt *evnt)
1632 11957 : {
1633 5985 : ASSERT(evnt__valid(evnt));
1634 :
1635 11957 : vlg_dbg2(vlg, "shutdown(SHUT_WR) from[$<sa:%p>]\n", EVNT_SA(evnt));
1636 :
1637 11957 : evnt_fd_set_cork(evnt, FALSE); /* eats data in 2.4.22-1.2199.4.legacy.npt */
1638 :
1639 11957 : if (evnt->io_r_shutdown || evnt->io_w_shutdown)
1640 237 : return (FALSE);
1641 :
1642 11720 : if (shutdown(evnt_fd(evnt), SHUT_WR) == -1)
1643 : {
1644 0 : if (errno != ENOTCONN)
1645 0 : vlg_warn(vlg, "shutdown(SHUT_WR): %m\n");
1646 0 : return (FALSE);
1647 : }
1648 11720 : evnt->io_w_shutdown = TRUE;
1649 :
1650 11720 : vstr_del(evnt->io_w, 1, evnt->io_w->len);
1651 11720 : vstr_del(evnt->io_r, 1, evnt->io_r->len);
1652 :
1653 11720 : evnt__send_fin(evnt);
1654 :
1655 11720 : if (evnt->flag_q_recv)
1656 11720 : evnt_wait_cntl_add(evnt, POLLIN);
1657 :
1658 5804 : ASSERT(evnt__valid(evnt));
1659 :
1660 11720 : return (TRUE);
1661 : }
1662 :
1663 : int evnt_recv(struct Evnt *evnt, unsigned int *ern)
1664 51859 : {
1665 51859 : Vstr_base *data = evnt->io_r;
1666 51859 : size_t tmp = evnt->io_r->len;
1667 51859 : unsigned int num_min = 2;
1668 51859 : unsigned int num_max = 6; /* ave. browser reqs are 500ish, ab is much less */
1669 51859 : struct Evnt_limit *lim = NULL;
1670 51859 : unsigned int buf_sz = 0;
1671 :
1672 25144 : ASSERT(evnt__valid(evnt) && ern);
1673 :
1674 : /* default for HTTPD's buf size of 120 (128 - 8) */
1675 51859 : if (!vstr_cntl_conf(data->conf, VSTR_CNTL_CONF_GET_NUM_BUF_SZ, &buf_sz))
1676 0 : buf_sz = 120;
1677 :
1678 51859 : if (evnt->prev_bytes_r >= (buf_sz * 2))
1679 : {
1680 11613 : num_min = 8; /* 8 * 120 = 960 */
1681 11613 : num_max = 8;
1682 : }
1683 51859 : if (evnt->prev_bytes_r >= (buf_sz * 8))
1684 7825 : num_max = 64; /* 64 * 120 = 7680 */
1685 :
1686 51859 : if (!(lim = evnt_limit_r(evnt, num_max * buf_sz)))
1687 51859 : vstr_sc_read_iov_fd(data, data->len, evnt_fd(evnt), num_min, num_max, ern);
1688 0 : else if (lim->io_r_cur)
1689 0 : vstr_sc_read_len_fd(data, data->len, evnt_fd(evnt), lim->io_r_cur, ern);
1690 : else /* pretend we read "something" ... but magicly got nothing, not EOF */
1691 0 : *ern = VSTR_TYPE_SC_READ_FD_ERR_NONE;
1692 :
1693 51859 : evnt->prev_bytes_r = (evnt->io_r->len - tmp);
1694 51859 : evnt__acct_r(evnt, evnt->prev_bytes_r);
1695 :
1696 51859 : vlg_dbg3(vlg, "read($<sa:%p>) = %ju\n", EVNT_SA(evnt), evnt->prev_bytes_r);
1697 :
1698 51859 : if (!evnt_limit_timeout_r(evnt, lim))
1699 0 : errno = ENOMEM, *ern = VSTR_TYPE_SC_READ_FD_ERR_MEM;
1700 :
1701 51859 : switch (*ern)
1702 : {
1703 : case VSTR_TYPE_SC_READ_FD_ERR_NONE:
1704 37490 : if (evnt->io_w_shutdown) /* doesn't count if we can't respond */
1705 3855 : vstr_del(data, 1, data->len);
1706 : else
1707 33635 : EVNT__COPY_TV(&evnt->mtime);
1708 37490 : return (TRUE);
1709 :
1710 : case VSTR_TYPE_SC_READ_FD_ERR_MEM:
1711 0 : vlg_warn(vlg, "%s\n", __func__);
1712 0 : break;
1713 :
1714 : case VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO:
1715 2203 : if (errno == EAGAIN)
1716 2199 : return (TRUE);
1717 2 : break;
1718 :
1719 : case VSTR_TYPE_SC_READ_FD_ERR_EOF:
1720 6090 : break;
1721 :
1722 : default: /* unknown */
1723 0 : vlg_warn(vlg, "read_iov() = %d: %m\n", *ern);
1724 : }
1725 :
1726 12170 : return (FALSE);
1727 : }
1728 :
1729 : int evnt_send(struct Evnt *evnt)
1730 11788223 : {
1731 11788223 : unsigned int ern = 0;
1732 :
1733 1978813 : ASSERT(evnt__valid(evnt));
1734 :
1735 11788223 : if (!evnt__call_send(evnt, &ern))
1736 0 : return (FALSE);
1737 :
1738 11788223 : EVNT__COPY_TV(&evnt->mtime);
1739 :
1740 11788223 : evnt__send_fin(evnt);
1741 :
1742 1978813 : ASSERT(evnt__valid(evnt));
1743 :
1744 11788223 : return (TRUE);
1745 : }
1746 :
1747 : #ifndef HAVE_OFF64_T
1748 : # define sendfile64 sendfile
1749 : #endif
1750 :
1751 : int evnt_sendfile(struct Evnt *evnt, int ffd,
1752 : uintmax_t *f_off, uintmax_t *f_len, unsigned int *ern)
1753 6023919 : {
1754 6023919 : ssize_t ret = 0;
1755 6023919 : off64_t tmp_off = *f_off;
1756 6023919 : size_t tmp_len = *f_len;
1757 6023919 : struct Evnt_limit *lim = NULL;
1758 :
1759 6023919 : *ern = 0;
1760 :
1761 1729959 : ASSERT(evnt__valid(evnt));
1762 :
1763 1729959 : ASSERT(!evnt->io_w->len);
1764 :
1765 6023919 : if (*f_len > SSIZE_MAX)
1766 0 : tmp_len = SSIZE_MAX;
1767 :
1768 6023919 : if (!(lim = evnt_limit_w(evnt, tmp_len)))
1769 : {
1770 6023723 : if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, tmp_len)) == -1)
1771 : {
1772 6015958 : if (errno == EAGAIN)
1773 6015958 : return (TRUE);
1774 :
1775 0 : *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO;
1776 0 : return (FALSE);
1777 : }
1778 : }
1779 196 : else if (lim->io_w_cur)
1780 : {
1781 72 : if ((ret = sendfile64(evnt_fd(evnt), ffd, &tmp_off, lim->io_w_cur)) == -1)
1782 : {
1783 0 : if (errno == EAGAIN)
1784 0 : return (TRUE);
1785 :
1786 0 : *ern = VSTR_TYPE_SC_READ_FD_ERR_READ_ERRNO;
1787 0 : return (FALSE);
1788 : }
1789 : }
1790 :
1791 7961 : if (!ret && (!lim || lim->io_w_cur))
1792 : {
1793 3 : *ern = VSTR_TYPE_SC_READ_FD_ERR_EOF;
1794 3 : return (FALSE);
1795 : }
1796 :
1797 7958 : *f_off = tmp_off;
1798 :
1799 7958 : evnt__acct_w(evnt, ret);
1800 7958 : if (!evnt_limit_timeout_w(evnt, lim))
1801 : {
1802 0 : errno = ENOMEM, *ern = VSTR_TYPE_SC_WRITE_FD_ERR_MEM;
1803 0 : return (FALSE);
1804 : }
1805 :
1806 7958 : EVNT__COPY_TV(&evnt->mtime);
1807 :
1808 7958 : *f_len -= ret;
1809 :
1810 7958 : return (TRUE);
1811 : }
1812 :
1813 : int evnt_sc_read_send(struct Evnt *evnt, int fd, uintmax_t *len)
1814 6446 : {
1815 6446 : Vstr_base *out = evnt->io_w;
1816 6446 : size_t orig_len = out->len;
1817 6446 : size_t tmp = 0;
1818 6446 : int ret = IO_OK;
1819 :
1820 3061 : ASSERT(len && *len);
1821 :
1822 6446 : if ((ret = io_get(out, fd)) == IO_FAIL)
1823 0 : return (EVNT_IO_READ_ERR);
1824 :
1825 6446 : if (ret == IO_EOF)
1826 2 : return (EVNT_IO_READ_EOF);
1827 :
1828 6444 : tmp = out->len - orig_len;
1829 :
1830 6444 : if (tmp >= *len)
1831 : { /* we might not be transfering to EOF, so reduce if needed */
1832 4126 : vstr_sc_reduce(out, 1, out->len, tmp - *len);
1833 1897 : ASSERT((out->len - orig_len) == *len);
1834 4126 : *len = 0;
1835 4126 : return (EVNT_IO_READ_FIN);
1836 : }
1837 :
1838 2318 : *len -= tmp;
1839 :
1840 2318 : if (!evnt_send(evnt))
1841 0 : return (EVNT_IO_SEND_ERR);
1842 :
1843 2318 : return (EVNT_IO_OK);
1844 : }
1845 :
1846 : static int evnt__get_timeout(void)
1847 9178060 : {
1848 9178060 : const struct timeval *tv = NULL;
1849 9178060 : int msecs = -1;
1850 :
1851 9178060 : if (q_send_now)
1852 8888027 : msecs = 0;
1853 290033 : else if ((tv = timer_q_first_timeval()))
1854 : {
1855 289828 : long diff = 0;
1856 : struct timeval now_timeval;
1857 :
1858 289828 : EVNT__COPY_TV(&now_timeval);
1859 :
1860 289828 : diff = timer_q_timeval_diff_msecs(tv, &now_timeval);
1861 :
1862 289828 : if (diff > 0)
1863 : {
1864 47685 : if (diff >= INT_MAX)
1865 0 : msecs = INT_MAX - 1;
1866 : else
1867 47685 : msecs = diff;
1868 : }
1869 : else
1870 242143 : msecs = 0;
1871 : }
1872 :
1873 9178060 : vlg_dbg2(vlg, "get_timeout = %'d\n", msecs);
1874 :
1875 9178060 : return (msecs);
1876 : }
1877 :
1878 : void evnt_scan_q_close(void)
1879 36712323 : {
1880 36712323 : struct Evnt *scan = NULL;
1881 :
1882 73424848 : while ((scan = q_closed))
1883 : {
1884 202 : scan->flag_q_closed = FALSE;
1885 202 : q_closed = scan->c_next;
1886 :
1887 202 : evnt_free(scan);
1888 : }
1889 :
1890 7820035 : ASSERT(!q_closed);
1891 36712323 : }
1892 :
1893 : /* if something goes wrong drop all accept'ing events */
1894 : void evnt_acpt_close_all(void)
1895 45 : {
1896 45 : struct Evnt *evnt = q_accept; /* struct Evnt *evnt = evnt_queue("accept"); */
1897 :
1898 90 : while (evnt)
1899 : {
1900 0 : evnt_close(evnt);
1901 :
1902 0 : evnt = evnt->next;
1903 : }
1904 45 : }
1905 :
1906 : void evnt_scan_fds(unsigned int ready, size_t max_sz)
1907 9178060 : {
1908 9178060 : const int bad_poll_flags = (POLLERR | POLLHUP | POLLNVAL);
1909 9178060 : struct Evnt *scan = NULL;
1910 :
1911 9178060 : EVNT__UPDATE_TV();
1912 :
1913 9178060 : scan = q_connect;
1914 18356120 : while (scan && ready)
1915 : {
1916 0 : struct Evnt *scan_next = NULL;
1917 0 : int done = FALSE;
1918 0 : int revents = 0;
1919 :
1920 0 : ASSERT(evnt__valid(scan));
1921 :
1922 0 : if (!scan->flag_q_connect)
1923 0 : break;
1924 :
1925 0 : revents = SOCKET_POLL_INDICATOR(scan->ind)->revents;
1926 0 : SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1927 :
1928 0 : if (scan->flag_q_closed)
1929 : {
1930 0 : done = !!revents;
1931 0 : scan_next = scan->next;
1932 0 : goto next_connect;
1933 : }
1934 :
1935 0 : assert(!(revents & POLLIN));
1936 : /* done as one so we get error code */
1937 0 : if (revents & (POLLOUT|bad_poll_flags))
1938 : {
1939 0 : int ern = 0;
1940 0 : socklen_t len = sizeof(int);
1941 0 : int ret = 0;
1942 :
1943 0 : done = TRUE;
1944 :
1945 0 : ret = getsockopt(evnt_fd(scan), SOL_SOCKET, SO_ERROR, &ern, &len);
1946 0 : if (ret == -1)
1947 0 : vlg_err(vlg, EXIT_FAILURE, "getsockopt(SO_ERROR): %m\n");
1948 0 : else if (ern)
1949 : {
1950 0 : errno = ern;
1951 0 : vlg_warn(vlg, "connect(): %m\n");
1952 :
1953 0 : scan_next = scan->next;
1954 0 : evnt__close_now(scan);
1955 : }
1956 : else
1957 : {
1958 0 : evnt_del(&q_connect, scan); scan->flag_q_connect = FALSE;
1959 0 : evnt_add(&q_none, scan); scan->flag_q_none = TRUE;
1960 0 : evnt_wait_cntl_del(scan, POLLOUT);
1961 :
1962 0 : if (!scan->cbs->cb_func_connect(scan))
1963 : {
1964 0 : scan_next = scan->next;
1965 0 : evnt__close_now(scan);
1966 : }
1967 : }
1968 0 : goto next_connect;
1969 : }
1970 0 : ASSERT(!done);
1971 0 : if (evnt_poll_direct_enabled()) break;
1972 :
1973 0 : scan_next = scan->next;
1974 0 : next_connect:
1975 0 : if (done)
1976 0 : --ready;
1977 :
1978 0 : scan = scan_next;
1979 : }
1980 :
1981 9178060 : scan = q_accept;
1982 18366910 : while (scan && ready)
1983 : { /* Papers have suggested that preferring read over accept is better
1984 : * -- edge triggering needs to requeue on non failure */
1985 56158 : struct Evnt *scan_next = NULL;
1986 56158 : int done = FALSE;
1987 56158 : int revents = 0;
1988 :
1989 24623 : ASSERT(evnt__valid(scan));
1990 :
1991 56158 : if (!scan->flag_q_accept)
1992 0 : break;
1993 :
1994 56158 : revents = SOCKET_POLL_INDICATOR(scan->ind)->revents;
1995 56158 : SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
1996 :
1997 56158 : if (scan->flag_q_closed)
1998 : {
1999 0 : done = !!revents;
2000 0 : scan_next = scan->next;
2001 0 : goto next_accept;
2002 : }
2003 :
2004 24623 : assert(!(revents & POLLOUT));
2005 56158 : if (!done && (revents & bad_poll_flags))
2006 : { /* done first as it's an error with the accept fd, whereas accept
2007 : * generates new fds */
2008 0 : done = TRUE;
2009 0 : scan_next = scan->next;
2010 0 : evnt__close_now(scan);
2011 0 : goto next_accept;
2012 : }
2013 :
2014 56158 : if (revents & POLLIN)
2015 : {
2016 : struct sockaddr_storage sstore[1];
2017 10790 : struct sockaddr *sa = (struct sockaddr *) sstore;
2018 10790 : socklen_t len = sizeof(struct sockaddr_storage);
2019 10790 : int fd = -1;
2020 10790 : struct Evnt *tmp = NULL;
2021 10790 : unsigned int acpt_num = 0;
2022 :
2023 10790 : done = TRUE;
2024 :
2025 : /* ignore all accept() errors -- bad_poll_flags fixes here */
2026 : /* FIXME: apache seems to assume certain errors are really bad and we
2027 : * should just kill the listen socket and wait to die. But for instance.
2028 : * we can't just kill the socket on EMFILE, as we might have hit our
2029 : * resource limit */
2030 33432 : while ((revents & POLLIN) && (fd = accept(evnt_fd(scan), sa, &len)) != -1)
2031 : {
2032 12284 : if (!(tmp = scan->cbs->cb_func_accept(scan, fd, sa, len)))
2033 : {
2034 0 : close(fd);
2035 0 : scan_next = scan->next;
2036 0 : goto next_accept;
2037 : }
2038 :
2039 12284 : if (!acpt_num) /* for the first one, update */
2040 10745 : EVNT__COPY_TV(&scan->mtime);
2041 :
2042 12284 : if (!tmp->flag_q_closed)
2043 : {
2044 12284 : ++ready; /* give a read event to this new event */
2045 12284 : tmp->flag_fully_acpt = TRUE;
2046 : }
2047 6142 : assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
2048 6142 : assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
2049 6142 : assert(tmp == q_recv);
2050 :
2051 12284 : if (++acpt_num >= evnt__accept_limit)
2052 432 : break;
2053 : }
2054 :
2055 10790 : scan_next = scan->next;
2056 10790 : goto next_accept;
2057 : }
2058 19857 : ASSERT(!done);
2059 45368 : if (evnt_poll_direct_enabled()) break;
2060 :
2061 0 : scan_next = scan->next;
2062 :
2063 10790 : next_accept:
2064 10790 : if (done)
2065 10790 : --ready;
2066 :
2067 10790 : scan = scan_next;
2068 : }
2069 :
2070 9178060 : evnt__scan_ready_moved_send_zero_fds = 0;
2071 9178060 : scan = q_recv;
2072 18407650 : while (scan && ready)
2073 : {
2074 51839 : struct Evnt *scan_next = NULL;
2075 51839 : int done = FALSE;
2076 51839 : int revents = 0;
2077 :
2078 25135 : ASSERT(evnt__valid(scan));
2079 :
2080 51839 : if (!scan->flag_q_recv)
2081 6 : break;
2082 :
2083 51833 : revents = SOCKET_POLL_INDICATOR(scan->ind)->revents;
2084 51833 : SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
2085 :
2086 51833 : if (scan->flag_q_closed)
2087 : {
2088 0 : done = !!revents;
2089 0 : scan_next = scan->next;
2090 0 : goto next_recv;
2091 : }
2092 :
2093 51833 : if (revents & POLLIN)
2094 : {
2095 51526 : done = TRUE;
2096 51526 : if (!scan->cbs->cb_func_recv(scan))
2097 : {
2098 12066 : scan_next = scan->next;
2099 12066 : evnt__close_now(scan);
2100 12066 : goto next_recv;
2101 : }
2102 : }
2103 :
2104 39767 : if (!done && (revents & bad_poll_flags))
2105 : {
2106 4 : done = TRUE;
2107 4 : if (scan->io_r_shutdown || scan->io_w_shutdown ||
2108 : !scan->cbs->cb_func_shutdown_r(scan))
2109 : {
2110 4 : scan_next = scan->next;
2111 4 : evnt__close_now(scan);
2112 4 : goto next_recv;
2113 : }
2114 : }
2115 :
2116 39763 : scan_next = scan->next;
2117 :
2118 51833 : next_recv:
2119 51833 : if (!done && evnt_poll_direct_enabled()) break;
2120 :
2121 51530 : if (done)
2122 51530 : --ready;
2123 :
2124 51530 : scan = scan_next;
2125 : }
2126 :
2127 9178060 : scan = q_send_recv;
2128 18356460 : while (scan && ready)
2129 : {
2130 353 : struct Evnt *scan_next = NULL;
2131 353 : int done = FALSE;
2132 353 : int revents = 0;
2133 :
2134 220 : ASSERT(evnt__valid(scan));
2135 :
2136 353 : if (!scan->flag_q_send_recv)
2137 7 : break;
2138 :
2139 346 : revents = SOCKET_POLL_INDICATOR(scan->ind)->revents;
2140 346 : SOCKET_POLL_INDICATOR(scan->ind)->revents = 0;
2141 :
2142 346 : if (scan->flag_q_closed)
2143 : {
2144 0 : done = !!revents;
2145 0 : scan_next = scan->next;
2146 0 : goto next_send;
2147 : }
2148 :
2149 346 : if (revents & POLLIN)
2150 : {
2151 333 : done = TRUE;
2152 333 : if (!scan->cbs->cb_func_recv(scan))
2153 : {
2154 0 : scan_next = scan->next;
2155 0 : evnt__close_now(scan);
2156 0 : goto next_send;
2157 : }
2158 : }
2159 :
2160 346 : if (revents & POLLOUT)
2161 : {
2162 11 : done = TRUE; /* need groups so we can do direct send here */
2163 11 : if (!evnt_send_add(scan, TRUE, max_sz))
2164 : {
2165 0 : scan_next = scan->next;
2166 0 : evnt__close_now(scan);
2167 0 : goto next_send;
2168 : }
2169 : }
2170 :
2171 346 : if (!done && (revents & bad_poll_flags))
2172 : {
2173 0 : done = TRUE;
2174 0 : if (scan->io_r_shutdown || !scan->cbs->cb_func_shutdown_r(scan))
2175 : {
2176 0 : scan_next = scan->next;
2177 0 : evnt__close_now(scan);
2178 0 : goto next_send;
2179 : }
2180 : }
2181 :
2182 346 : scan_next = scan->next;
2183 :
2184 346 : next_send:
2185 347 : if (!done && evnt__scan_ready_moved_send_zero_fds)
2186 1 : --evnt__scan_ready_moved_send_zero_fds;
2187 : else
2188 : {
2189 345 : if (!done && evnt_poll_direct_enabled()) break;
2190 :
2191 339 : if (done)
2192 339 : --ready;
2193 : }
2194 :
2195 340 : scan = scan_next;
2196 : }
2197 :
2198 9178060 : scan = q_none;
2199 18356120 : while (scan && ready)
2200 : {
2201 0 : struct Evnt *scan_next = scan->next;
2202 0 : int done = FALSE;
2203 :
2204 0 : ASSERT(evnt__valid(scan));
2205 :
2206 0 : if (!scan->flag_q_none)
2207 0 : break;
2208 :
2209 0 : if (scan->flag_q_closed)
2210 0 : goto next_none;
2211 :
2212 0 : if (SOCKET_POLL_INDICATOR(scan->ind)->revents)
2213 : { /* POLLIN == EOF ? */
2214 : /* FIXME: failure cb */
2215 0 : done = TRUE;
2216 :
2217 0 : evnt__close_now(scan);
2218 0 : goto next_none;
2219 : }
2220 :
2221 0 : ASSERT(!done);
2222 0 : if (evnt_poll_direct_enabled()) break;
2223 :
2224 0 : next_none:
2225 0 : if (done)
2226 0 : --ready;
2227 :
2228 0 : scan = scan_next;
2229 : }
2230 :
2231 9178060 : if (q_closed)
2232 74 : evnt_scan_q_close();
2233 9177986 : else if (ready) /* FIXME: needs a different approach */
2234 13 : vlg_warn(vlg, "ready = %d\n", ready);
2235 9178060 : }
2236 :
2237 : void evnt_scan_send_fds(void)
2238 18356120 : {
2239 18356120 : struct Evnt **scan = NULL;
2240 :
2241 18356120 : evnt_scan_q_close();
2242 :
2243 18356120 : scan = &q_send_now;
2244 54495712 : while (*scan)
2245 : {
2246 17783472 : struct Evnt *tmp = *scan;
2247 :
2248 17783472 : tmp->flag_q_send_now = FALSE;
2249 17783472 : *scan = tmp->s_next;
2250 17783472 : if (!tmp->cbs->cb_func_send(tmp))
2251 : {
2252 201 : evnt__close_now(tmp);
2253 201 : continue;
2254 : }
2255 17783271 : if (tmp == *scan) /* added back to q */
2256 : {
2257 3690424 : ASSERT(tmp->flag_q_send_now == TRUE);
2258 17777129 : scan = &tmp->s_next;
2259 : }
2260 : }
2261 :
2262 18356120 : evnt_scan_q_close();
2263 18356120 : }
2264 :
2265 : static void evnt__close_1(struct Evnt **root)
2266 1380 : {
2267 1380 : struct Evnt *scan = *root;
2268 :
2269 1380 : *root = NULL;
2270 :
2271 2992 : while (scan)
2272 : {
2273 232 : struct Evnt *scan_next = scan->next;
2274 232 : Vstr_ref *acpt_sa = scan->acpt_sa_ref;
2275 232 : Vstr_ref *sa = scan->sa_ref;
2276 232 : Timer_q_node *tm_o = scan->tm_o;
2277 232 : Timer_q_node *tm_l_r = scan->tm_l_r;
2278 232 : Timer_q_node *tm_l_w = scan->tm_l_w;
2279 :
2280 232 : vstr_free_base(scan->io_w); scan->io_w = NULL;
2281 232 : vstr_free_base(scan->io_r); scan->io_r = NULL;
2282 :
2283 232 : evnt_poll_del(scan);
2284 :
2285 232 : --evnt__num;
2286 232 : evnt__free2(sa, acpt_sa, tm_o, tm_l_r, tm_l_w);
2287 232 : scan->cbs->cb_func_free(scan);
2288 :
2289 232 : scan = scan_next;
2290 : }
2291 1380 : }
2292 :
2293 : void evnt_close_all(void)
2294 276 : {
2295 276 : q_send_now = NULL;
2296 276 : q_closed = NULL;
2297 :
2298 276 : evnt__close_1(&q_connect);
2299 276 : evnt__close_1(&q_accept);
2300 276 : evnt__close_1(&q_recv);
2301 276 : evnt__close_1(&q_send_recv);
2302 276 : evnt__close_1(&q_none);
2303 138 : ASSERT(evnt__num == evnt__debug_num_all());
2304 276 : }
2305 :
2306 : void evnt_out_dbg3(const char *prefix)
2307 45892297 : {
2308 45892297 : if (vlg->out_dbg < 3)
2309 45891565 : return;
2310 :
2311 732 : vlg_dbg3(vlg, "%s T=%u c=%u a=%u r=%u s=%u n=%u [SN=%u]\n",
2312 : prefix, evnt_num_all(),
2313 : evnt__debug_num_1(q_connect),
2314 : evnt__debug_num_1(q_accept),
2315 : evnt__debug_num_1(q_recv),
2316 : evnt__debug_num_1(q_send_recv),
2317 : evnt__debug_num_1(q_none),
2318 : evnt__debug_num_1(q_send_now));
2319 : }
2320 :
2321 : void evnt_stats_add(struct Evnt *dst, const struct Evnt *src)
2322 12244 : {
2323 12244 : EVNT__COPY_TV(&dst->mtime);
2324 :
2325 12244 : dst->acct.req_put += src->acct.req_put;
2326 12244 : dst->acct.req_got += src->acct.req_got;
2327 :
2328 12244 : dst->acct.bytes_r += src->acct.bytes_r;
2329 12244 : dst->acct.bytes_w += src->acct.bytes_w;
2330 12244 : }
2331 :
2332 : unsigned int evnt_num_all(void)
2333 17141631 : {
2334 17141048 : ASSERT(evnt__num == evnt__debug_num_all());
2335 17141631 : return (evnt__num);
2336 : }
2337 :
2338 : int evnt_waiting(void)
2339 9177671 : {
2340 9177671 : return (q_connect || q_accept || q_recv || q_send_recv || q_send_now);
2341 : }
2342 :
2343 : struct Evnt *evnt_find_least_used(void)
2344 232 : {
2345 232 : struct Evnt *con = NULL;
2346 232 : struct Evnt *con_min = NULL;
2347 :
2348 : /* Find a usable connection, tries to find the least used connection
2349 : * preferring ones not blocking on send IO */
2350 232 : if (!(con = q_none) &&
2351 : !(con = q_recv) &&
2352 : !(con = q_send_recv))
2353 0 : return (NULL);
2354 :
2355 : /* FIXME: not optimal, only want to change after a certain level */
2356 232 : con_min = con;
2357 696 : while (con)
2358 : {
2359 232 : if (con_min->io_w->len > con->io_w->len)
2360 0 : con_min = con;
2361 232 : con = con->next;
2362 : }
2363 :
2364 232 : return (con_min);
2365 : }
2366 :
2367 : #define MATCH_Q_NAME(x) \
2368 : if (CSTREQ(qname, #x )) \
2369 : return ( q_ ## x ) \
2370 :
2371 : struct Evnt *evnt_queue(const char *qname)
2372 694 : {
2373 694 : MATCH_Q_NAME(connect);
2374 614 : MATCH_Q_NAME(accept);
2375 320 : MATCH_Q_NAME(send_recv);
2376 240 : MATCH_Q_NAME(recv);
2377 160 : MATCH_Q_NAME(none);
2378 80 : MATCH_Q_NAME(send_now);
2379 :
2380 0 : return (NULL);
2381 : }
2382 :
2383 : void evnt_fd_set_nagle(struct Evnt *evnt, int val)
2384 16685 : {
2385 7922 : ASSERT(evnt__valid(evnt));
2386 :
2387 16685 : val = !!val;
2388 :
2389 16685 : if (evnt->flag_io_nagle == val)
2390 4615 : return;
2391 :
2392 12070 : if (!evnt_fd__set_nodelay(evnt_fd(evnt), !val))
2393 18 : return;
2394 :
2395 12052 : evnt->flag_io_nagle = val;
2396 : }
2397 :
2398 : void evnt_fd_set_cork(struct Evnt *evnt, int val)
2399 77034 : { /* assume it can't work for set and fail for unset */
2400 37609 : ASSERT(evnt__valid(evnt));
2401 :
2402 : if (!USE_TCP_CORK)
2403 : return;
2404 :
2405 77034 : val = !!val;
2406 :
2407 77034 : if (evnt->flag_io_cork == val)
2408 43712 : return;
2409 :
2410 33322 : if (val) /* flags can't be combined ... stupid */
2411 16685 : evnt_fd_set_nagle(evnt, TRUE);
2412 :
2413 33322 : if (!evnt_fd__set_cork(evnt_fd(evnt), val))
2414 18 : return;
2415 :
2416 33304 : evnt->flag_io_cork = val;
2417 : }
2418 :
2419 : static void evnt__free_base_noerrno(Vstr_base *s1)
2420 0 : {
2421 0 : int saved_errno = errno;
2422 0 : vstr_free_base(s1);
2423 0 : errno = saved_errno;
2424 0 : }
2425 :
2426 : int evnt_fd_set_filter(struct Evnt *evnt, const char *fname)
2427 2 : {
2428 2 : int fd = evnt_fd(evnt);
2429 2 : Vstr_base *s1 = NULL;
2430 2 : unsigned int ern = 0;
2431 :
2432 : if (!CONF_USE_SOCKET_FILTERS)
2433 : return (TRUE);
2434 :
2435 2 : if (!(s1 = vstr_make_base(NULL)))
2436 0 : VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach0: %m\n"));
2437 :
2438 2 : vstr_sc_read_len_file(s1, 0, fname, 0, 0, &ern);
2439 :
2440 2 : if (ern &&
2441 : !(((ern == VSTR_TYPE_SC_READ_FILE_ERR_OPEN_ERRNO) &&
2442 : (errno == ENOENT)) ||
2443 : ((ern == VSTR_TYPE_SC_READ_FILE_ERR_FSTAT_ERRNO) &&
2444 : (errno == ENOSPC))))
2445 : {
2446 0 : evnt__free_base_noerrno(s1);
2447 0 : VLG_WARN_RET(FALSE, (vlg, "filter_attach1(%s): %m\n", fname));
2448 : }
2449 2 : else if ((s1->len / sizeof(struct sock_filter)) > USHRT_MAX)
2450 : {
2451 0 : vstr_free_base(s1);
2452 0 : errno = E2BIG;
2453 0 : VLG_WARN_RET(FALSE, (vlg, "filter_attach2(%s): %m\n", fname));
2454 : }
2455 2 : else if (!s1->len)
2456 : {
2457 2 : vstr_free_base(s1);
2458 2 : if (!evnt->flag_io_filter)
2459 2 : vlg_warn(vlg, "filter_attach3(%s): Empty file\n", fname);
2460 0 : else if (setsockopt(fd, SOL_SOCKET, SO_DETACH_FILTER, NULL, 0) == -1)
2461 : {
2462 0 : evnt__free_base_noerrno(s1);
2463 0 : VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, DETACH_FILTER, %s): %m\n",
2464 : fname));
2465 : }
2466 :
2467 2 : evnt->flag_io_filter = FALSE;
2468 2 : return (TRUE);
2469 : }
2470 : else
2471 : {
2472 : struct sock_fprog filter[1];
2473 0 : socklen_t len = sizeof(filter);
2474 :
2475 0 : filter->len = s1->len / sizeof(struct sock_filter);
2476 0 : filter->filter = (void *)vstr_export_cstr_ptr(s1, 1, s1->len);
2477 :
2478 0 : if (!filter->filter)
2479 0 : VLG_WARNNOMEM_RET(FALSE, (vlg, "filter_attach4: %m\n"));
2480 :
2481 0 : if (setsockopt(fd, SOL_SOCKET, SO_ATTACH_FILTER, filter, len) == -1)
2482 : {
2483 0 : evnt__free_base_noerrno(s1);
2484 0 : VLG_WARN_RET(FALSE, (vlg, "setsockopt(SOCKET, ATTACH_FILTER, %s): %m\n",
2485 : fname));
2486 : }
2487 : }
2488 :
2489 0 : evnt->flag_io_filter = TRUE;
2490 :
2491 0 : vstr_free_base(s1);
2492 :
2493 0 : return (TRUE);
2494 : }
2495 :
2496 : static Timer_q_base *evnt__timeout_1 = NULL;
2497 : static Timer_q_base *evnt__timeout_10 = NULL;
2498 : static Timer_q_base *evnt__timeout_100 = NULL;
2499 :
2500 : static Timer_q_node *evnt__timeout_mtime_make(struct Evnt *evnt,
2501 : struct timeval *tv,
2502 : unsigned long msecs)
2503 12060 : {
2504 12060 : Timer_q_node *tm_o = NULL;
2505 :
2506 12060 : vlg_dbg2(vlg, "mtime_make($<sa:%p>, %'lu)\n", EVNT_SA(evnt), msecs);
2507 :
2508 : if (0) { }
2509 12060 : else if (msecs >= ( 99 * 1000))
2510 : {
2511 4770 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 100, 0);
2512 4770 : tm_o = timer_q_add_node(evnt__timeout_100, evnt, tv,
2513 : TIMER_Q_FLAG_NODE_DEFAULT);
2514 : }
2515 7290 : else if (msecs >= ( 9 * 1000))
2516 : {
2517 7290 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 10, 0);
2518 7290 : tm_o = timer_q_add_node(evnt__timeout_10, evnt, tv,
2519 : TIMER_Q_FLAG_NODE_DEFAULT);
2520 : }
2521 : else
2522 : {
2523 0 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
2524 0 : tm_o = timer_q_add_node(evnt__timeout_1, evnt, tv,
2525 : TIMER_Q_FLAG_NODE_DEFAULT);
2526 : }
2527 :
2528 12060 : return (tm_o);
2529 : }
2530 :
2531 : static void evnt__timer_cb_mtime(int type, void *data)
2532 12060 : {
2533 12060 : struct Evnt *evnt = data;
2534 : struct timeval tv[1];
2535 12060 : unsigned long diff = 0;
2536 :
2537 12060 : if (!evnt) /* deleted */
2538 12052 : return;
2539 :
2540 8 : ASSERT(evnt__valid(evnt));
2541 :
2542 8 : if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
2543 0 : return;
2544 :
2545 8 : evnt->tm_o = NULL;
2546 :
2547 8 : if (type == TIMER_Q_TYPE_CALL_DEL)
2548 0 : return;
2549 :
2550 8 : EVNT__COPY_TV(tv);
2551 :
2552 : /* find out time elapsed */
2553 8 : diff = timer_q_timeval_udiff_msecs(tv, &evnt->mtime);
2554 8 : if (diff < evnt->msecs_tm_mtime)
2555 8 : diff = evnt->msecs_tm_mtime - diff; /* seconds left until timeout */
2556 : else
2557 : {
2558 0 : vlg_dbg2(vlg, "timeout from[$<sa:%p>] = (%'lu, %'lu)\n",
2559 : EVNT_SA(evnt), diff, evnt->msecs_tm_mtime);
2560 0 : if (evnt->flag_insta_close || !evnt_shutdown_w(evnt))
2561 : {
2562 0 : evnt_close(evnt);
2563 0 : return;
2564 : }
2565 :
2566 : /* FIXME: linger close time configurable? */
2567 0 : EVNT__COPY_TV(&evnt->mtime);
2568 0 : evnt->msecs_tm_mtime /= 2;
2569 0 : diff = evnt->msecs_tm_mtime;
2570 : }
2571 :
2572 8 : if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, diff)))
2573 : {
2574 0 : errno = ENOMEM;
2575 0 : vlg_warn(vlg, "%s: %m\n", "timer reinit");
2576 0 : evnt_close(evnt);
2577 : }
2578 : }
2579 :
2580 : void evnt_timeout_init(void)
2581 64 : {
2582 64 : int flags = TIMER_Q_FLAG_BASE_DEFAULT;
2583 :
2584 32 : ASSERT(!evnt__timeout_1);
2585 :
2586 64 : EVNT__UPDATE_TV();
2587 :
2588 64 : evnt__timeout_1 = timer_q_add_base(evnt__timer_cb_mtime, flags);
2589 64 : evnt__timeout_10 = timer_q_add_base(evnt__timer_cb_mtime, flags);
2590 64 : evnt__timeout_100 = timer_q_add_base(evnt__timer_cb_mtime, flags);
2591 :
2592 64 : if (!evnt__timeout_1 || !evnt__timeout_10 || !evnt__timeout_100)
2593 0 : VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init"));
2594 :
2595 : /* we always allocate limit timers, just easier that way ... */
2596 64 : evnt__timeout_lim_r_1 = timer_q_add_base(evnt__timer_cb_lim_r, flags);
2597 64 : evnt__timeout_lim_r_10 = timer_q_add_base(evnt__timer_cb_lim_r, flags);
2598 64 : evnt__timeout_lim_w_1 = timer_q_add_base(evnt__timer_cb_lim_w, flags);
2599 64 : evnt__timeout_lim_w_10 = timer_q_add_base(evnt__timer_cb_lim_w, flags);
2600 :
2601 64 : if (!evnt__timeout_lim_r_1 || !evnt__timeout_lim_r_10 ||
2602 : !evnt__timeout_lim_w_1 || !evnt__timeout_lim_w_10)
2603 0 : VLG_ERRNOMEM((vlg, EXIT_FAILURE, "timer init"));
2604 64 : }
2605 :
2606 : void evnt_timeout_exit(void)
2607 44 : {
2608 22 : ASSERT(evnt__timeout_1);
2609 :
2610 44 : timer_q_del_base(evnt__timeout_1); evnt__timeout_1 = NULL;
2611 44 : timer_q_del_base(evnt__timeout_10); evnt__timeout_10 = NULL;
2612 44 : timer_q_del_base(evnt__timeout_100); evnt__timeout_100 = NULL;
2613 44 : timer_q_del_base(evnt__timeout_lim_r_1); evnt__timeout_lim_r_1 = NULL;
2614 44 : timer_q_del_base(evnt__timeout_lim_r_10); evnt__timeout_lim_r_10 = NULL;
2615 44 : timer_q_del_base(evnt__timeout_lim_w_1); evnt__timeout_lim_w_1 = NULL;
2616 44 : timer_q_del_base(evnt__timeout_lim_w_10); evnt__timeout_lim_w_10 = NULL;
2617 44 : }
2618 :
2619 : int evnt_sc_timeout_via_mtime(struct Evnt *evnt, unsigned long msecs)
2620 12052 : {
2621 : struct timeval tv[1];
2622 :
2623 12052 : if (!(evnt->msecs_tm_mtime = msecs))
2624 0 : return (TRUE);
2625 :
2626 12052 : EVNT__COPY_TV(tv);
2627 :
2628 12052 : if (!(evnt->tm_o = evnt__timeout_mtime_make(evnt, tv, msecs)))
2629 0 : return (FALSE);
2630 :
2631 12052 : return (TRUE);
2632 : }
2633 :
2634 : void evnt_sc_main_loop(size_t max_sz)
2635 9176339 : {
2636 9176339 : int ready = 0;
2637 : struct timeval tv[1];
2638 :
2639 9176339 : EVNT__UPDATE_TV();
2640 9176339 : ready = evnt_poll();
2641 9176339 : if ((ready == -1) && (errno != EINTR))
2642 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", "poll");
2643 9176339 : if (ready == -1)
2644 0 : return;
2645 :
2646 9176339 : evnt_out_dbg3("1");
2647 9176339 : evnt_scan_fds(ready, max_sz);
2648 9176339 : evnt_out_dbg3("2");
2649 9176339 : evnt_scan_send_fds();
2650 9176339 : evnt_out_dbg3("3");
2651 :
2652 9176339 : EVNT__UPDATE_TV();
2653 9176339 : EVNT__COPY_TV(tv);
2654 9176339 : timer_q_run_norm(tv);
2655 :
2656 9176339 : evnt_out_dbg3("4");
2657 9176339 : evnt_scan_send_fds();
2658 9176339 : evnt_out_dbg3("5");
2659 : }
2660 :
2661 : time_t evnt_sc_time(void)
2662 127455 : {
2663 127455 : time_t ret = -1;
2664 :
2665 : if (!CONF_GETTIMEOFDAY_TIME)
2666 : ret = time(NULL);
2667 : else
2668 : {
2669 : struct timeval tv[1];
2670 :
2671 127455 : EVNT__COPY_TV(tv);
2672 127455 : ret = tv->tv_sec;
2673 : }
2674 :
2675 127455 : return (ret);
2676 : }
2677 :
2678 : void evnt_sc_serv_cb_func_acpt_free(struct Evnt *evnt)
2679 126 : {
2680 126 : struct Acpt_listener *acpt_listener = (struct Acpt_listener *)evnt;
2681 126 : struct Acpt_data *acpt_data = acpt_listener->ref->ptr;
2682 :
2683 126 : evnt_vlg_stats_info(acpt_listener->evnt, "ACCEPT FREE");
2684 :
2685 126 : acpt_data->evnt = NULL;
2686 126 : vstr_ref_del(acpt_listener->ref);
2687 126 : vstr_ref_del(acpt_listener->def_policy);
2688 126 : F(acpt_listener);
2689 126 : }
2690 :
2691 : static void evnt__sc_serv_make_acpt_data_cb(Vstr_ref *ref)
2692 126 : {
2693 126 : struct Acpt_data *ptr = NULL;
2694 :
2695 126 : if (!ref)
2696 0 : return;
2697 :
2698 126 : ptr = ref->ptr;
2699 126 : vstr_ref_del(ptr->sa);
2700 126 : F(ptr);
2701 126 : free(ref);
2702 : }
2703 :
2704 : struct Evnt *evnt_sc_serv_make_bind_ipv4(const char *acpt_addr,
2705 : unsigned short acpt_port,
2706 : unsigned int q_listen_len,
2707 : unsigned int max_connections,
2708 : unsigned int defer_accept,
2709 : const char *acpt_filter_file,
2710 : const char *acpt_cong)
2711 120 : {
2712 120 : struct sockaddr_in *sinv4 = NULL;
2713 120 : Acpt_listener *acpt_listener = NULL;
2714 120 : Acpt_data *acpt_data = NULL;
2715 120 : Vstr_ref *ref = NULL;
2716 :
2717 120 : if (!(acpt_listener = MK(sizeof(Acpt_listener))))
2718 0 : VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2719 : acpt_addr, acpt_port));
2720 120 : acpt_listener->max_connections = max_connections;
2721 120 : acpt_listener->def_policy = NULL;
2722 :
2723 120 : if (!(acpt_data = MK(sizeof(Acpt_data))))
2724 0 : VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2725 : acpt_addr, acpt_port));
2726 120 : acpt_data->evnt = NULL;
2727 120 : acpt_data->sa = NULL;
2728 :
2729 120 : if (!(ref = vstr_ref_make_ptr(acpt_data, evnt__sc_serv_make_acpt_data_cb)))
2730 0 : VLG_ERRNOMEM((vlg, EXIT_FAILURE, "make_bind(%s, %hu): %m\n",
2731 : acpt_addr, acpt_port));
2732 120 : acpt_listener->ref = ref;
2733 :
2734 120 : if (!evnt_make_bind_ipv4(acpt_listener->evnt, acpt_addr, acpt_port,
2735 : q_listen_len, acpt_cong))
2736 0 : vlg_err(vlg, 2, "make_bind(%s, %hd, %d, %s): Failed!\n",
2737 : acpt_addr ? acpt_addr : "any", acpt_port, q_listen_len, acpt_cong);
2738 120 : acpt_data->evnt = acpt_listener->evnt;
2739 120 : acpt_data->sa = vstr_ref_add(acpt_data->evnt->sa_ref);
2740 :
2741 120 : sinv4 = EVNT_SA_IN4(acpt_listener->evnt);
2742 60 : ASSERT(!acpt_port || (acpt_port == ntohs(sinv4->sin_port)));
2743 :
2744 120 : if (defer_accept)
2745 116 : evnt_fd_set_defer_accept(acpt_listener->evnt, defer_accept);
2746 :
2747 120 : if (acpt_filter_file &&
2748 : !evnt_fd_set_filter(acpt_listener->evnt, acpt_filter_file))
2749 0 : vlg_err(vlg, 3, "set_filter(%s): %m\n", acpt_filter_file);
2750 :
2751 120 : acpt_listener->evnt->cbs->cb_func_free = evnt_sc_serv_cb_func_acpt_free;
2752 :
2753 120 : return (acpt_listener->evnt);
2754 : }
2755 :
2756 : void evnt_vlg_stats_info(struct Evnt *evnt, const char *prefix)
2757 12454 : {
2758 12454 : vlg_info(vlg, "%s from[$<sa:%p>] req_got[%'u:%u] req_put[%'u:%u]"
2759 : " recv[${BKMG.ju:%ju}:%ju] send[${BKMG.ju:%ju}:%ju]\n",
2760 : prefix, EVNT_SA(evnt),
2761 : evnt->acct.req_got, evnt->acct.req_got,
2762 : evnt->acct.req_put, evnt->acct.req_put,
2763 : evnt->acct.bytes_r, evnt->acct.bytes_r,
2764 : evnt->acct.bytes_w, evnt->acct.bytes_w);
2765 12454 : }
2766 :
2767 : #ifdef TCP_DEFER_ACCEPT
2768 : # define USE_TCP_DEFER_ACCEPT 1
2769 : #else
2770 : # define USE_TCP_DEFER_ACCEPT 0
2771 : # define TCP_DEFER_ACCEPT 0
2772 : #endif
2773 :
2774 : void evnt_fd_set_defer_accept(struct Evnt *evnt, int val)
2775 116 : {
2776 116 : socklen_t len = sizeof(int);
2777 :
2778 58 : ASSERT(evnt__valid(evnt));
2779 :
2780 : if (!USE_TCP_DEFER_ACCEPT)
2781 : return;
2782 :
2783 : /* ignore return val */
2784 116 : setsockopt(evnt_fd(evnt), IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, len);
2785 116 : }
2786 :
2787 : pid_t evnt_make_child(void)
2788 4 : {
2789 4 : pid_t ret = fork();
2790 :
2791 8 : if (!ret)
2792 : {
2793 4 : evnt__is_child = TRUE;
2794 4 : evnt_poll_child_init();
2795 : }
2796 :
2797 8 : return (ret);
2798 : }
2799 :
2800 : int evnt_is_child(void)
2801 44 : {
2802 44 : return (evnt__is_child);
2803 : }
2804 :
2805 : int evnt_child_block_beg(void)
2806 0 : {
2807 0 : if (!evnt_is_child())
2808 0 : return (TRUE);
2809 :
2810 0 : if (PROC_CNTL_PDEATHSIG(SIGTERM) == -1)
2811 0 : vlg_err(vlg, EXIT_FAILURE,
2812 : "prctl(%s, %s): %m\n", "PR_SET_PDEATHSIG", "SIGTERM");
2813 :
2814 0 : if (evnt_child_exited)
2815 0 : return (FALSE);
2816 :
2817 0 : return (TRUE);
2818 : }
2819 :
2820 : int evnt_child_block_end(void)
2821 0 : {
2822 0 : if (!evnt_is_child())
2823 0 : return (TRUE);
2824 :
2825 0 : if (PROC_CNTL_PDEATHSIG(SIGCHLD) == -1)
2826 0 : vlg_err(vlg, EXIT_FAILURE,
2827 : "prctl(%s, %s): %m\n", "PR_SET_PDEATHSIG", "SIGCHLD");
2828 :
2829 0 : return (TRUE);
2830 : }
2831 :
2832 : /* if we are blocking forever, and the only thing we are waiting for is
2833 : * a single accept() fd, just call accept() */
2834 : static int evnt__poll_tst_accept(int msecs)
2835 9178060 : {
2836 9178060 : return ((msecs == -1) && (evnt_num_all() == 1) && q_accept);
2837 : }
2838 :
2839 : static int evnt__poll_accept(void)
2840 0 : {
2841 0 : int fd = -1;
2842 0 : struct Evnt *scan = q_accept;
2843 : struct sockaddr_in sa;
2844 0 : socklen_t len = sizeof(struct sockaddr_in);
2845 0 : struct Evnt *tmp = NULL;
2846 :
2847 : /* need to make sure we die if the parent does */
2848 0 : evnt_fd__set_nonblock(evnt_fd(scan), FALSE);
2849 0 : if (!evnt_child_block_beg())
2850 0 : goto block_beg_fail;
2851 :
2852 0 : fd = accept(evnt_fd(scan), (struct sockaddr *) &sa, &len);
2853 0 : evnt_child_block_end();
2854 0 : evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2855 :
2856 0 : if (fd == -1)
2857 0 : goto accept_fail;
2858 :
2859 0 : if (!(tmp = scan->cbs->cb_func_accept(scan,
2860 : fd, (struct sockaddr *) &sa, len)))
2861 0 : goto cb_accept_fail;
2862 :
2863 0 : if (!tmp->flag_q_closed)
2864 0 : tmp->flag_fully_acpt = TRUE;
2865 0 : assert(SOCKET_POLL_INDICATOR(tmp->ind)->events == POLLIN);
2866 0 : assert(SOCKET_POLL_INDICATOR(tmp->ind)->revents == POLLIN);
2867 0 : assert(tmp == q_recv);
2868 :
2869 0 : return (1);
2870 :
2871 0 : block_beg_fail:
2872 0 : evnt_fd__set_nonblock(evnt_fd(scan), TRUE);
2873 0 : goto accept_fail;
2874 :
2875 0 : cb_accept_fail:
2876 0 : close(fd);
2877 0 : accept_fail:
2878 0 : errno = EINTR;
2879 0 : return (-1);
2880 : }
2881 :
2882 :
2883 : #ifndef EVNT_USE_EPOLL
2884 : # ifdef HAVE_SYS_EPOLL_H
2885 : # define EVNT_USE_EPOLL 1
2886 : # else
2887 : # define EVNT_USE_EPOLL 0
2888 : # endif
2889 : #endif
2890 :
2891 : #if !EVNT_USE_EPOLL
2892 : int evnt_poll_init(void)
2893 : {
2894 : return (TRUE);
2895 : }
2896 :
2897 : int evnt_poll_direct_enabled(void)
2898 : {
2899 : return (FALSE);
2900 : }
2901 :
2902 : int evnt_poll_child_init(void)
2903 : {
2904 : return (TRUE);
2905 : }
2906 :
2907 : void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
2908 : {
2909 : /* FIXME: do the POLLOUT revents differently ... move accept into the cb */
2910 : SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
2911 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN);
2912 : }
2913 :
2914 : void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
2915 : {
2916 : SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
2917 : SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
2918 : }
2919 :
2920 : unsigned int evnt_poll_add(struct Evnt *COMPILE_ATTR_UNUSED(evnt), int fd)
2921 : {
2922 : return (socket_poll_add(fd));
2923 : }
2924 :
2925 : void evnt_poll_del(struct Evnt *evnt)
2926 : {
2927 : if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
2928 : close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
2929 : socket_poll_del(evnt->ind);
2930 : }
2931 :
2932 : /* NOTE: that because of socket_poll direct mapping etc. we can't be "clever" */
2933 : int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
2934 : {
2935 : unsigned int old_ind = evnt->ind;
2936 :
2937 : assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
2938 :
2939 : ASSERT(evnt__valid(evnt));
2940 :
2941 : if (!(evnt->ind = socket_poll_add(fd)))
2942 : goto poll_add_fail;
2943 :
2944 : if (!(evnt->io_r = vstr_make_base(NULL)) ||
2945 : !(evnt->io_w = vstr_make_base(NULL)))
2946 : goto malloc_base_fail;
2947 :
2948 : SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
2949 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
2950 :
2951 : socket_poll_del(old_ind);
2952 :
2953 : evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
2954 : evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
2955 :
2956 : evnt_fd__set_nonblock(fd, TRUE);
2957 :
2958 : ASSERT(evnt__valid(evnt));
2959 :
2960 : return (TRUE);
2961 :
2962 : malloc_base_fail:
2963 : socket_poll_del(evnt->ind);
2964 : evnt->ind = old_ind;
2965 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
2966 : ASSERT(!evnt->io_r && !evnt->io_w);
2967 : poll_add_fail:
2968 : return (FALSE);
2969 : }
2970 :
2971 : int evnt_poll(void)
2972 : {
2973 : int msecs = evnt__get_timeout();
2974 :
2975 : if (evnt_child_exited)
2976 : return (errno = EINTR, -1);
2977 :
2978 : if (evnt__poll_tst_accept(msecs))
2979 : return (evnt__poll_accept());
2980 :
2981 : return (socket_poll_update_all(msecs));
2982 : }
2983 : #else
2984 :
2985 : #include <sys/epoll.h>
2986 :
2987 : static int evnt__epoll_fd = -1;
2988 :
2989 : int evnt_poll_init(void)
2990 64 : {
2991 : assert(POLLIN == EPOLLIN);
2992 : assert(POLLOUT == EPOLLOUT);
2993 : assert(POLLHUP == EPOLLHUP);
2994 : assert(POLLERR == EPOLLERR);
2995 :
2996 : if (!CONF_EVNT_NO_EPOLL)
2997 : {
2998 64 : evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ);
2999 :
3000 64 : vlg_dbg2(vlg, "epoll_create(%d): %m\n", evnt__epoll_fd);
3001 : }
3002 :
3003 64 : return (evnt__epoll_fd != -1);
3004 : }
3005 :
3006 : int evnt_poll_direct_enabled(void)
3007 9279721 : {
3008 9279721 : return (evnt__epoll_fd != -1);
3009 : }
3010 :
3011 : static int evnt__epoll_readd(struct Evnt *evnt)
3012 20 : {
3013 : struct epoll_event epevent[1];
3014 :
3015 50 : while (evnt)
3016 : {
3017 10 : int flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
3018 :
3019 10 : vlg_dbg2(vlg, "epoll_readd($<sa:%p>,%u=%s)\n", EVNT_SA(evnt),
3020 : flags, EVNT__POLL_FLGS(flags));
3021 10 : epevent->events = flags;
3022 10 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3023 10 : epevent->data.ptr = evnt;
3024 :
3025 10 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, evnt_fd(evnt), epevent) == -1)
3026 0 : vlg_err(vlg, EXIT_FAILURE, "epoll_readd: %m\n");
3027 :
3028 10 : evnt = evnt->next;
3029 : }
3030 :
3031 20 : return (TRUE);
3032 : }
3033 :
3034 : int evnt_poll_child_init(void)
3035 4 : { /* Can't share epoll() fd's between tasks ... */
3036 4 : if (CONF_EVNT_DUP_EPOLL && evnt_poll_direct_enabled())
3037 : {
3038 4 : close(evnt__epoll_fd);
3039 4 : evnt__epoll_fd = epoll_create(CONF_EVNT_EPOLL_SZ); /* size does nothing */
3040 4 : if (evnt__epoll_fd == -1)
3041 0 : VLG_WARN_RET(FALSE, (vlg, "epoll_recreate(): %m\n"));
3042 :
3043 4 : evnt__epoll_readd(q_connect);
3044 4 : evnt__epoll_readd(q_accept);
3045 4 : evnt__epoll_readd(q_recv);
3046 4 : evnt__epoll_readd(q_send_recv);
3047 4 : evnt__epoll_readd(q_none);
3048 : }
3049 :
3050 4 : return (TRUE);
3051 : }
3052 :
3053 : void evnt_wait_cntl_add(struct Evnt *evnt, int flags)
3054 55617 : {
3055 55617 : if ((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags)
3056 27949 : return;
3057 :
3058 : /* FIXME: do the POLLOUT revents differently ... move accept into the cb */
3059 27668 : SOCKET_POLL_INDICATOR(evnt->ind)->events |= flags;
3060 27668 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= (flags & POLLIN);
3061 :
3062 27668 : if (evnt_poll_direct_enabled())
3063 : {
3064 : struct epoll_event epevent[1];
3065 :
3066 27436 : flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
3067 27436 : vlg_dbg2(vlg, "epoll_mod_add($<sa:%p>,%u=%s)\n", EVNT_SA(evnt),
3068 : flags, EVNT__POLL_FLGS(flags));
3069 27436 : epevent->events = flags;
3070 27436 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3071 27436 : epevent->data.ptr = evnt;
3072 :
3073 27436 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
3074 0 : vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
3075 : }
3076 : }
3077 :
3078 : void evnt_wait_cntl_del(struct Evnt *evnt, int flags)
3079 16610 : {
3080 16610 : if (!(SOCKET_POLL_INDICATOR(evnt->ind)->events & flags))
3081 982 : return;
3082 :
3083 15628 : SOCKET_POLL_INDICATOR(evnt->ind)->events &= ~flags;
3084 15628 : SOCKET_POLL_INDICATOR(evnt->ind)->revents &= ~flags;
3085 :
3086 15628 : if (flags && evnt_poll_direct_enabled())
3087 : {
3088 : struct epoll_event epevent[1];
3089 :
3090 15400 : flags = SOCKET_POLL_INDICATOR(evnt->ind)->events;
3091 15400 : vlg_dbg2(vlg, "epoll_mod_del($<sa:%p>,%u=%s)\n", EVNT_SA(evnt),
3092 : flags, EVNT__POLL_FLGS(flags));
3093 15400 : epevent->events = flags;
3094 15400 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3095 15400 : epevent->data.ptr = evnt;
3096 :
3097 15400 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_MOD, evnt_fd(evnt), epevent) == -1)
3098 0 : vlg_err(vlg, EXIT_FAILURE, "epoll: %m\n");
3099 : }
3100 : }
3101 :
3102 : unsigned int evnt_poll_add(struct Evnt *evnt, int fd)
3103 12680 : {
3104 12680 : unsigned int ind = socket_poll_add(fd);
3105 :
3106 12680 : if (ind && evnt_poll_direct_enabled())
3107 : {
3108 : struct epoll_event epevent[1];
3109 12448 : int flags = 0;
3110 :
3111 12448 : vlg_dbg2(vlg, "epoll_add($<sa:%p>,%u=%s)\n", EVNT_SA(evnt),
3112 : flags, EVNT__POLL_FLGS(flags));
3113 12448 : epevent->events = flags;
3114 12448 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3115 12448 : epevent->data.ptr = evnt;
3116 :
3117 12448 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
3118 : {
3119 0 : vlg_warn(vlg, "epoll: %m\n");
3120 0 : socket_poll_del(fd);
3121 : }
3122 : }
3123 :
3124 12680 : return (ind);
3125 : }
3126 :
3127 : void evnt_poll_del(struct Evnt *evnt)
3128 12690 : {
3129 12690 : if (SOCKET_POLL_INDICATOR(evnt->ind)->fd != -1)
3130 12690 : close(SOCKET_POLL_INDICATOR(evnt->ind)->fd);
3131 12690 : socket_poll_del(evnt->ind);
3132 :
3133 : /* done via. the close() */
3134 : if (FALSE && evnt_poll_direct_enabled())
3135 : {
3136 : int fd = SOCKET_POLL_INDICATOR(evnt->ind)->fd;
3137 : struct epoll_event epevent[1];
3138 :
3139 : vlg_dbg2(vlg, "epoll_del($<sa:%p>)\n", EVNT_SA(evnt));
3140 : epevent->events = 0;
3141 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3142 : epevent->data.ptr = evnt;
3143 :
3144 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, fd, epevent) == -1)
3145 : vlg_abort(vlg, "epoll: %m\n");
3146 : }
3147 12690 : }
3148 :
3149 : int evnt_poll_swap_accept_read(struct Evnt *evnt, int fd)
3150 4 : {
3151 4 : unsigned int old_ind = evnt->ind;
3152 4 : int old_fd = SOCKET_POLL_INDICATOR(old_ind)->fd;
3153 :
3154 2 : assert(SOCKET_POLL_INDICATOR(evnt->ind)->fd != fd);
3155 :
3156 2 : ASSERT(evnt__valid(evnt));
3157 :
3158 4 : if (!(evnt->ind = socket_poll_add(fd)))
3159 0 : goto poll_add_fail;
3160 :
3161 4 : if (!(evnt->io_r = vstr_make_base(NULL)) ||
3162 : !(evnt->io_w = vstr_make_base(NULL)))
3163 : goto malloc_base_fail;
3164 :
3165 4 : SOCKET_POLL_INDICATOR(evnt->ind)->events |= POLLIN;
3166 4 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLIN;
3167 :
3168 4 : socket_poll_del(old_ind);
3169 :
3170 4 : evnt_del(&q_accept, evnt); evnt->flag_q_accept = FALSE;
3171 4 : evnt_add(&q_recv, evnt); evnt->flag_q_recv = TRUE;
3172 :
3173 4 : evnt_fd__set_nonblock(fd, TRUE);
3174 :
3175 4 : if (evnt_poll_direct_enabled())
3176 : {
3177 : struct epoll_event epevent[1];
3178 :
3179 4 : vlg_dbg2(vlg, "epoll_swap($<sa:%p>,%d,%d)\n", EVNT_SA(evnt), old_fd, fd);
3180 :
3181 4 : epevent->events = POLLIN;
3182 4 : epevent->data.u64 = 0; /* FIXME: keep valgrind happy */
3183 4 : epevent->data.ptr = evnt;
3184 :
3185 4 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_DEL, old_fd, epevent) == -1)
3186 0 : vlg_abort(vlg, "epoll: %m\n");
3187 :
3188 4 : epevent->events = SOCKET_POLL_INDICATOR(evnt->ind)->events;
3189 4 : epevent->data.ptr = evnt;
3190 :
3191 4 : if (epoll_ctl(evnt__epoll_fd, EPOLL_CTL_ADD, fd, epevent) == -1)
3192 0 : vlg_abort(vlg, "epoll: %m\n");
3193 : }
3194 :
3195 2 : ASSERT(evnt__valid(evnt));
3196 :
3197 4 : return (TRUE);
3198 :
3199 0 : malloc_base_fail:
3200 0 : socket_poll_del(evnt->ind);
3201 0 : evnt->ind = old_ind;
3202 0 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
3203 0 : ASSERT(!evnt->io_r && !evnt->io_w);
3204 0 : poll_add_fail:
3205 0 : return (FALSE);
3206 : }
3207 :
3208 : #define EVNT__EPOLL_EVENTS 128
3209 : int evnt_poll(void)
3210 9178060 : {
3211 : struct epoll_event events[EVNT__EPOLL_EVENTS];
3212 9178060 : int msecs = evnt__get_timeout();
3213 9178060 : int ret = -1;
3214 9178060 : unsigned int scan = 0;
3215 :
3216 9178060 : if (evnt_child_exited)
3217 0 : return (errno = EINTR, -1);
3218 :
3219 9178060 : if (evnt__poll_tst_accept(msecs))
3220 0 : return (evnt__poll_accept());
3221 :
3222 9178060 : if (!evnt_poll_direct_enabled())
3223 1721 : return (socket_poll_update_all(msecs));
3224 :
3225 9176339 : ret = epoll_wait(evnt__epoll_fd, events, EVNT__EPOLL_EVENTS, msecs);
3226 9176339 : if (ret == -1)
3227 0 : return (ret);
3228 :
3229 9176339 : scan = ret;
3230 1954139 : ASSERT(scan <= EVNT__EPOLL_EVENTS);
3231 16448235 : while (scan-- > 0)
3232 : {
3233 49696 : struct Evnt *evnt = NULL;
3234 49696 : unsigned int flags = 0;
3235 :
3236 49696 : flags = events[scan].events;
3237 49696 : evnt = events[scan].data.ptr;
3238 :
3239 23458 : ASSERT(evnt__valid(evnt));
3240 :
3241 49696 : vlg_dbg2(vlg, "epoll_wait($<sa:%p>,%u=%s)\n", EVNT_SA(evnt),
3242 : flags, EVNT__POLL_FLGS(flags));
3243 49696 : vlg_dbg2(vlg, "epoll[flags]=a=%u|r=%u|s=%u\n",
3244 : evnt->flag_q_accept, evnt->flag_q_recv, evnt->flag_q_send_recv);
3245 :
3246 23458 : assert(((SOCKET_POLL_INDICATOR(evnt->ind)->events & flags) == flags) ||
3247 : ((POLLHUP|POLLERR) & flags));
3248 :
3249 49696 : SOCKET_POLL_INDICATOR(evnt->ind)->revents = flags;
3250 :
3251 49696 : evnt__del_whatever(evnt); /* move to front of queue */
3252 49696 : evnt__add_whatever(evnt);
3253 : }
3254 :
3255 9176339 : return (ret);
3256 : }
3257 : #endif
|