1 : /*
2 : * Copyright (C) 2002, 2003, 2004, 2005, 2006 James Antill
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation; either
7 : * version 2 of the License, or (at your option) any later version.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : * email: james@and.org
19 : */
20 : /* IO events, and some help with timed events */
21 : #include <vstr.h>
22 :
23 : #include <stdlib.h>
24 : #include <sys/types.h>
25 : #include <sys/socket.h>
26 : #include <unistd.h>
27 : #include <fcntl.h>
28 : #include <errno.h>
29 : #include <getopt.h>
30 : #include <string.h>
31 : #include <netinet/in.h>
32 : #include <sys/un.h>
33 : #include <netinet/tcp.h>
34 : #include <arpa/inet.h>
35 : #include <sys/poll.h>
36 : #include <netdb.h>
37 : #include <sys/time.h>
38 : #include <time.h>
39 : #include <signal.h>
40 : #include <sys/stat.h>
41 :
42 : #include <socket_poll.h>
43 : #include <timer_q.h>
44 :
45 : #include <sys/sendfile.h>
46 :
47 : #ifndef TCP_CONGESTION
48 : # ifdef __linux__
49 : # define TCP_CONGESTION 13
50 : # else
51 : # define TCP_CONGESTION 0
52 : # endif
53 : #endif
54 :
55 : /* FIXME: Should do better autoconf checks... */
56 : #if defined(__linux__)
57 : /* Linux doesn't let TCP_NODELAY be config., like Solaris ... and maybe *BSD?
58 : * and doesn't inherit socket flags from accept() like *BSD */
59 : # define HAVE_TCP_NODELAY_CONFIG FALSE
60 : #else
61 : # define HAVE_TCP_NODELAY_CONFIG TRUE
62 : #endif
63 :
64 : #if defined(__BSD__)
65 : /* Linux doesn't inherit socket flags from accept() like *BSD ...
66 : * and maybe Solaris? */
67 : # define HAVE_SOCK_FLAGS_INHERIT TRUE
68 : #else
69 : # define HAVE_SOCK_FLAGS_INHERIT FALSE
70 : #endif
71 :
72 : #define EVNT__POLL_FLGS(x) \
73 : ((((x) & (POLLIN | POLLOUT)) == (POLLIN | POLLOUT)) ? "(POLLIN | POLLOUT)" : \
74 : (((x) & POLLIN) ? "(POLLIN)" : \
75 : (((x) & POLLOUT) ? "(POLLOUT)" : \
76 : "()")))
77 :
78 : #define CONF_EVNT_NO_EPOLL FALSE
79 : #define CONF_EVNT_EPOLL_SZ (10 * 1000) /* size is just a speed hint */
80 : #define CONF_EVNT_DUP_EPOLL TRUE /* doesn't work if FALSE and multi proc */
81 : #define CONF_GETTIMEOFDAY_TIME TRUE /* does tv_sec contain time(NULL) */
82 :
83 : #ifdef CONF_FULL_STATIC
84 : # define EVNT__RESOLVE_NAME(saddr, x) do { \
85 : if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
86 : saddr->sin_addr.s_addr = htonl(INADDR_ANY); \
87 : } while (FALSE)
88 : #else
89 : # include <netdb.h>
90 : # define EVNT__RESOLVE_NAME(saddr, x) do { \
91 : if ((saddr->sin_addr.s_addr = inet_addr(x)) == INADDR_NONE) \
92 : { \
93 : struct hostent *h = gethostbyname(x); \
94 : \
95 : saddr->sin_addr.s_addr = htonl(INADDR_ANY); \
96 : if (h) \
97 : memcpy(&saddr->sin_addr.s_addr, \
98 : h->h_addr_list[0], \
99 : sizeof(saddr->sin_addr.s_addr)); \
100 : } \
101 : } while (FALSE)
102 : #endif
103 :
104 : #if !defined(SO_DETACH_FILTER) || !defined(SO_ATTACH_FILTER)
105 : # define CONF_USE_SOCKET_FILTERS FALSE
106 : struct sock_fprog { int dummy; };
107 : # define SO_DETACH_FILTER 0
108 : # define SO_ATTACH_FILTER 0
109 : #else
110 : # define CONF_USE_SOCKET_FILTERS TRUE
111 :
112 : /* not in glibc... hope it's not in *BSD etc. */
113 : struct sock_filter
114 : {
115 : uint16_t code; /* Actual filter code */
116 : uint8_t jt; /* Jump true */
117 : uint8_t jf; /* Jump false */
118 : uint32_t k; /* Generic multiuse field */
119 : };
120 :
121 : struct sock_fprog
122 : {
123 : unsigned short len; /* Number of filter blocks */
124 : struct sock_filter *filter;
125 : };
126 : #endif
127 :
128 : #include "vlg.h"
129 :
130 : #define EX_UTILS_NO_USE_INIT 1
131 : #define EX_UTILS_NO_USE_EXIT 1
132 : #define EX_UTILS_NO_USE_LIMIT 1
133 : #define EX_UTILS_NO_USE_BLOCK 1
134 : #define EX_UTILS_NO_USE_PUT 1
135 : #define EX_UTILS_NO_USE_OPEN 1
136 : #define EX_UTILS_NO_USE_IO_FD 1
137 : #include "ex_utils.h"
138 :
139 : #ifdef HAVE_TCP_CORK
140 : # define USE_TCP_CORK TRUE
141 : #else
142 : # define USE_TCP_CORK FALSE
143 : # define TCP_CORK 0
144 : #endif
145 :
146 : #include "evnt.h"
147 :
148 : #include "mk.h"
149 :
150 : volatile sig_atomic_t evnt_child_exited = FALSE;
151 :
152 : int evnt_opt_nagle = EVNT_CONF_NAGLE;
153 :
154 : static struct Evnt *q_send_now = NULL; /* Try a send "now" */
155 : static struct Evnt *q_closed = NULL; /* Close when fin. */
156 :
157 : static struct Evnt *q_none = NULL; /* nothing */
158 : static struct Evnt *q_accept = NULL; /* connections - recv */
159 : static struct Evnt *q_connect = NULL; /* connections - send */
160 : static struct Evnt *q_recv = NULL; /* recv */
161 : static struct Evnt *q_send_recv = NULL; /* recv + send */
162 :
163 : static Vlg *vlg = NULL;
164 :
165 : static unsigned int evnt__num = 0;
166 :
167 : /* things can move from recv -> send_recv with a timeout,
168 : * so they look like the end of events NULL */
169 : static unsigned int evnt__scan_ready_moved_send_zero_fds = 0;
170 :
171 : /* this should be more configurable... */
172 : static unsigned int evnt__accept_limit = 4;
173 :
174 : static struct timeval evnt__tv[1];
175 :
176 : static int evnt__is_child = FALSE;
177 :
178 : #define EVNT__UPDATE_TV() gettimeofday(evnt__tv, NULL)
179 : #define EVNT__COPY_TV(x) memcpy(x, evnt__tv, sizeof(struct timeval))
180 :
181 : void evnt_logger(Vlg *passed_vlg)
182 296 : {
183 296 : vlg = passed_vlg;
184 296 : }
185 :
186 : void evnt_fd__set_nonblock(int fd, int val)
187 12916 : {
188 12916 : int flags = 0;
189 :
190 6458 : ASSERT(val == !!val);
191 :
192 12916 : if ((flags = fcntl(fd, F_GETFL)) == -1)
193 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
194 :
195 12916 : if (!!(flags & O_NONBLOCK) == val)
196 0 : return;
197 :
198 12916 : if (val)
199 12916 : flags |= O_NONBLOCK;
200 : else
201 0 : flags &= ~O_NONBLOCK;
202 :
203 12916 : if (fcntl(fd, F_SETFL, flags) == -1)
204 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
205 : }
206 :
207 : static void evnt_fd__set_coe(int fd, int val)
208 12680 : {
209 6340 : ASSERT(val == !!val);
210 :
211 12680 : if (fcntl(fd, F_SETFD, val) == -1)
212 0 : vlg_err(vlg, EXIT_FAILURE, "%s: %m\n", __func__);
213 12680 : }
214 :
215 : static int evnt_fd__set_nodelay(int fd, int val)
216 12302 : {
217 12302 : return (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != -1);
218 : }
219 :
220 : static int evnt_fd__set_cork(int fd, int val)
221 33322 : {
222 33322 : return (setsockopt(fd, IPPROTO_TCP, TCP_CORK, &val, sizeof(val)) != -1);
223 : }
224 :
225 : static int evnt_fd__set_reuse(int fd, int val)
226 120 : {
227 120 : return (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1);
228 : }
229 :
230 : #define CLEN_SZ(x) (strlen(x) + 1)
231 : static int evnt_fd__set_congestion(int fd, const char *val)
232 0 : {
233 : if (!TCP_CONGESTION)
234 : return (errno = ENOPROTOOPT, FALSE);
235 :
236 0 : return (setsockopt(fd, IPPROTO_TCP, TCP_CONGESTION, val, CLEN_SZ(val)) != -1);
237 : }
238 :
239 : void evnt_add(struct Evnt **que, struct Evnt *node)
240 63704 : {
241 30466 : assert(node != *que);
242 :
243 63704 : if ((node->next = *que))
244 38632 : node->next->prev = node;
245 :
246 63704 : node->prev = NULL;
247 63704 : *que = node;
248 63704 : }
249 :
250 : void evnt_del(struct Evnt **que, struct Evnt *node)
251 63482 : {
252 63482 : if (node->prev)
253 8140 : node->prev->next = node->next;
254 : else
255 : {
256 24424 : assert(*que == node);
257 55342 : *que = node->next;
258 : }
259 :
260 63482 : if (node->next)
261 35024 : node->next->prev = node->prev;
262 63482 : }
263 :
264 : static void evnt__del_whatever(struct Evnt *evnt)
265 62154 : {
266 : if (0) { }
267 62154 : else if (evnt->flag_q_accept)
268 10913 : evnt_del(&q_accept, evnt);
269 51241 : else if (evnt->flag_q_connect)
270 0 : evnt_del(&q_connect, evnt);
271 51241 : else if (evnt->flag_q_recv)
272 50840 : evnt_del(&q_recv, evnt);
273 401 : else if (evnt->flag_q_send_recv)
274 401 : evnt_del(&q_send_recv, evnt);
275 0 : else if (evnt->flag_q_none)
276 0 : evnt_del(&q_none, evnt);
277 : else
278 0 : ASSERT_NOT_REACHED();
279 62154 : }
280 :
281 : static void COMPILE_ATTR_USED() evnt__add_whatever(struct Evnt *evnt)
282 49696 : {
283 : if (0) { }
284 49696 : else if (evnt->flag_q_accept)
285 10747 : evnt_add(&q_accept, evnt);
286 38949 : else if (evnt->flag_q_connect)
287 0 : evnt_add(&q_connect, evnt);
288 38949 : else if (evnt->flag_q_recv)
289 38548 : evnt_add(&q_recv, evnt);
290 401 : else if (evnt->flag_q_send_recv)
291 401 : evnt_add(&q_send_recv, evnt);
292 0 : else if (evnt->flag_q_none)
293 0 : evnt_add(&q_none, evnt);
294 : else
295 0 : ASSERT_NOT_REACHED();
296 49696 : }
297 :
298 : static unsigned int evnt__debug_num_1(struct Evnt *scan)
299 85741467 : {
300 85741467 : unsigned int num = 0;
301 :
302 281254700 : while (scan)
303 : {
304 109771766 : struct Evnt *scan_next = scan->next;
305 :
306 109771766 : ++num;
307 :
308 109771766 : scan = scan_next;
309 : }
310 :
311 85741467 : return (num);
312 : }
313 :
314 : #if COMPILE_DEBUG
315 : static struct Evnt **evnt__srch(struct Evnt **que, struct Evnt *evnt)
316 17140448 : {
317 17140448 : struct Evnt **ret = que;
318 :
319 38142777 : while (*ret)
320 : {
321 21002329 : if (*ret == evnt)
322 17140448 : return (ret);
323 :
324 3861881 : ret = &(*ret)->next;
325 : }
326 :
327 0 : return (NULL);
328 : }
329 :
330 : static int evnt__valid(struct Evnt *evnt)
331 17140448 : {
332 17140448 : int ret = 0;
333 :
334 17140448 : ASSERT(evnt_num_all());
335 :
336 17140448 : ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
337 : evnt->flag_q_send_recv + evnt->flag_q_none) == 1);
338 :
339 17140448 : if (evnt->flag_q_send_now)
340 : {
341 3706444 : struct Evnt **scan = &q_send_now;
342 :
343 7438591 : while (*scan && (*scan != evnt))
344 25703 : scan = &(*scan)->s_next;
345 3706444 : ASSERT(*scan);
346 : }
347 : else
348 : {
349 13434004 : struct Evnt **scan = &q_send_now;
350 :
351 27170863 : while (*scan && (*scan != evnt))
352 302855 : scan = &(*scan)->s_next;
353 13434004 : ASSERT(!*scan);
354 : }
355 :
356 17140448 : if (evnt->flag_q_closed)
357 : {
358 101 : struct Evnt **scan = &q_closed;
359 :
360 202 : while (*scan && (*scan != evnt))
361 0 : scan = &(*scan)->c_next;
362 101 : ASSERT(*scan);
363 : }
364 : else
365 : {
366 17140347 : struct Evnt **scan = &q_closed;
367 :
368 34283536 : while (*scan && (*scan != evnt))
369 2842 : scan = &(*scan)->c_next;
370 17140347 : ASSERT(!*scan);
371 : }
372 :
373 : if (0) { }
374 17140448 : else if (evnt->flag_q_accept)
375 : {
376 40530 : ret = !!evnt__srch(&q_accept, evnt);
377 40530 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
378 40530 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
379 40530 : ASSERT(!evnt->io_r && !evnt->io_w);
380 : }
381 17099918 : else if (evnt->flag_q_connect)
382 : {
383 0 : ret = !!evnt__srch(&q_connect, evnt);
384 0 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
385 0 : assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) ||
386 : evnt->tm_l_w);
387 0 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
388 : }
389 17099918 : else if (evnt->flag_q_send_recv)
390 : {
391 9823997 : ret = !!evnt__srch(&q_send_recv, evnt);
392 9823997 : assert( (SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT) ||
393 : evnt->tm_l_w);
394 : }
395 7275921 : else if (evnt->flag_q_recv)
396 : {
397 7275113 : ret = !!evnt__srch(&q_recv, evnt);
398 7275113 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
399 7275113 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
400 : }
401 808 : else if (evnt->flag_q_none)
402 : {
403 808 : ret = !!evnt__srch(&q_none, evnt);
404 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLIN));
405 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->events & POLLOUT));
406 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLIN));
407 808 : assert(!(SOCKET_POLL_INDICATOR(evnt->ind)->revents & POLLOUT));
408 : }
409 : else
410 0 : ASSERT_NOT_REACHED();
411 :
412 17140448 : return (ret);
413 : }
414 :
415 : static unsigned int evnt__debug_num_all(void)
416 17147415 : {
417 17147415 : unsigned int num = 0;
418 :
419 17147415 : num += evnt__debug_num_1(q_connect);
420 17147415 : num += evnt__debug_num_1(q_accept);
421 17147415 : num += evnt__debug_num_1(q_recv);
422 17147415 : num += evnt__debug_num_1(q_send_recv);
423 17147415 : num += evnt__debug_num_1(q_none);
424 :
425 17147415 : return (num);
426 : }
427 : #endif
428 :
429 : int evnt_fd(struct Evnt *evnt)
430 18011716 : {
431 3806409 : ASSERT(evnt__valid(evnt));
432 18011716 : return (SOCKET_POLL_INDICATOR(evnt->ind)->fd);
433 : }
434 :
435 : int evnt_cb_func_connect(struct Evnt *COMPILE_ATTR_UNUSED(evnt))
436 0 : {
437 0 : return (TRUE);
438 : }
439 :
440 : struct Evnt *evnt_cb_func_accept(struct Evnt *COMPILE_ATTR_UNUSED(evnt),
441 : int COMPILE_ATTR_UNUSED(fd),
442 : struct sockaddr *COMPILE_ATTR_UNUSED(sa),
443 : socklen_t COMPILE_ATTR_UNUSED(len))
444 0 : {
445 0 : return (NULL);
446 : }
447 :
448 : int evnt_cb_func_recv(struct Evnt *evnt)
449 1371 : {
450 1371 : unsigned int ern = 0;
451 1371 : int ret = evnt_recv(evnt, &ern);
452 :
453 1371 : if (ret)
454 1139 : return (TRUE);
455 :
456 232 : if ((ern == VSTR_TYPE_SC_READ_FD_ERR_EOF) && evnt->io_w->len)
457 0 : return (evnt_shutdown_r(evnt, TRUE));
458 :
459 232 : return (FALSE);
460 : }
461 :
462 : int evnt_cb_func_send(struct Evnt *evnt)
463 18 : {
464 18 : int ret = -1;
465 :
466 18 : evnt_fd_set_cork(evnt, TRUE);
467 18 : ret = evnt_send(evnt);
468 18 : if (!evnt->io_w->len)
469 18 : evnt_fd_set_cork(evnt, FALSE);
470 :
471 18 : return (ret);
472 : }
473 :
474 : void evnt_cb_func_free(struct Evnt *evnt)
475 0 : {
476 0 : MALLOC_CHECK_SCRUB_PTR(evnt, sizeof(struct Evnt));
477 0 : free(evnt);
478 0 : }
479 :
480 : void evnt_cb_func_F(struct Evnt *evnt)
481 0 : {
482 0 : F(evnt);
483 0 : }
484 :
485 : int evnt_cb_func_shutdown_r(struct Evnt *evnt)
486 0 : {
487 0 : vlg_dbg2(vlg, "SHUTDOWN CB from[$<sa:%p>]\n", EVNT_SA(evnt));
488 :
489 0 : if (!evnt_shutdown_r(evnt, FALSE))
490 0 : return (FALSE);
491 :
492 : /* called from outside read, and read'll never get called again ...
493 : * so quit if we have nothing to send */
494 0 : return (!!evnt->io_w->len);
495 : }
496 :
497 : static int evnt_init(struct Evnt *evnt, int fd, Vstr_ref *ref,
498 : struct Evnt *from_evnt)
499 12680 : {
500 6340 : ASSERT(ref);
501 :
502 12680 : evnt->flag_q_accept = FALSE;
503 12680 : evnt->flag_q_connect = FALSE;
504 12680 : evnt->flag_q_recv = FALSE;
505 12680 : evnt->flag_q_send_recv = FALSE;
506 12680 : evnt->flag_q_none = FALSE;
507 :
508 12680 : evnt->flag_q_send_now = FALSE;
509 12680 : evnt->flag_q_closed = FALSE;
510 :
511 12680 : evnt->flag_q_pkt_move = FALSE;
512 :
513 12680 : evnt->flag_io_nagle = FALSE;
514 12680 : evnt->flag_io_cork = FALSE;
515 :
516 12680 : evnt->flag_io_filter = FALSE;
517 :
518 12680 : evnt->flag_fully_acpt = FALSE;
519 :
520 12680 : evnt->flag_insta_close = FALSE;
521 :
522 12680 : evnt->io_r_shutdown = FALSE;
523 12680 : evnt->io_w_shutdown = FALSE;
524 :
525 12680 : evnt->io_r_limited = FALSE;
526 12680 : evnt->io_w_limited = FALSE;
527 :
528 12680 : evnt->prev_bytes_r = 0;
529 :
530 12680 : evnt->acct.req_put = 0;
531 12680 : evnt->acct.req_got = 0;
532 12680 : evnt->acct.bytes_r = 0;
533 12680 : evnt->acct.bytes_w = 0;
534 :
535 12680 : evnt->cbs->cb_func_accept = evnt_cb_func_accept;
536 12680 : evnt->cbs->cb_func_connect = evnt_cb_func_connect;
537 12680 : evnt->cbs->cb_func_recv = evnt_cb_func_recv;
538 12680 : evnt->cbs->cb_func_send = evnt_cb_func_send;
539 12680 : evnt->cbs->cb_func_free = evnt_cb_func_F;
540 12680 : evnt->cbs->cb_func_shutdown_r = evnt_cb_func_shutdown_r;
541 :
542 12680 : if (!(evnt->io_r = vstr_make_base(NULL)))
543 0 : goto make_vstr_fail;
544 :
545 12680 : if (!(evnt->io_w = vstr_make_base(NULL)))
546 0 : goto make_vstr_fail;
547 :
548 12680 : evnt->tm_o = NULL;
549 :
550 12680 : evnt->tm_l_r = NULL;
551 12680 : evnt->tm_l_w = NULL;
552 12680 : evnt->lims = NULL;
553 12680 : evnt->lim_num = 0;
554 :
555 12680 : EVNT__COPY_TV(&evnt->ctime);
556 12680 : EVNT__COPY_TV(&evnt->mtime);
557 :
558 12680 : evnt->msecs_tm_mtime = 0;
559 :
560 12680 : evnt_fd__set_coe(fd, TRUE);
561 :
562 12680 : evnt->sa_ref = vstr_ref_add(ref);
563 12680 : evnt->acpt_sa_ref = NULL;
564 :
565 12680 : if (!(evnt->ind = evnt_poll_add(evnt, fd)))
566 0 : goto poll_add_fail;
567 :
568 : /* FIXME: need group settings */
569 : if (HAVE_SOCK_FLAGS_INHERIT && from_evnt &&
570 : (from_evnt->flag_io_nagle == evnt_opt_nagle))
571 : evnt->flag_io_nagle = evnt_opt_nagle;
572 12680 : else if (HAVE_TCP_NODELAY_CONFIG || !evnt_opt_nagle)
573 : {
574 232 : evnt_fd__set_nodelay(fd, !evnt->flag_io_nagle);
575 232 : evnt->flag_io_nagle = evnt_opt_nagle;
576 : }
577 :
578 : if (!HAVE_SOCK_FLAGS_INHERIT || !from_evnt)
579 12680 : evnt_fd__set_nonblock(fd, TRUE);
580 :
581 12680 : return (TRUE);
582 :
583 0 : poll_add_fail:
584 0 : vstr_ref_del(evnt->sa_ref); evnt->sa_ref = NULL;
585 0 : vstr_free_base(evnt->io_w);
586 0 : make_vstr_fail:
587 0 : vstr_free_base(evnt->io_r);
588 :
589 0 : errno = ENOMEM;
590 0 : return (FALSE);
591 : }
592 :
593 : static void evnt__free1(struct Evnt *evnt)
594 12458 : {
595 12458 : evnt_send_del(evnt);
596 :
597 12458 : if (evnt->io_r && evnt->io_r->len)
598 32 : vlg_dbg2(vlg, "evnt__free1($<sa:%p>) io_r len = %zu\n",
599 : EVNT_SA(evnt), evnt->io_r->len);
600 12458 : if (evnt->io_w && evnt->io_w->len)
601 0 : vlg_dbg2(vlg, "evnt__free1($<sa:%p>) io_w len = %zu\n",
602 : EVNT_SA(evnt), evnt->io_w->len);
603 :
604 12458 : vstr_free_base(evnt->io_w); evnt->io_w = NULL;
605 12458 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
606 :
607 12458 : evnt_poll_del(evnt);
608 12458 : evnt_limit_free(evnt);
609 12458 : }
610 :
611 : static void evnt__free_tq(Timer_q_node *tm)
612 38070 : {
613 38070 : if (tm)
614 : {
615 12052 : timer_q_cntl_node(tm, TIMER_Q_CNTL_NODE_SET_DATA, NULL);
616 12052 : timer_q_quick_del_node(tm);
617 : }
618 38070 : }
619 :
620 : static void evnt__free2(Vstr_ref *sa, Vstr_ref *acpt_sa,
621 : Timer_q_node *tm_o,
622 : Timer_q_node *tm_l_r, Timer_q_node *tm_l_w)
623 12690 : { /* post callbacks, evnt no longer exists */
624 12690 : vstr_ref_del(sa);
625 12690 : vstr_ref_del(acpt_sa);
626 :
627 12690 : evnt__free_tq(tm_o);
628 12690 : evnt__free_tq(tm_l_r);
629 12690 : evnt__free_tq(tm_l_w);
630 12690 : }
631 :
632 : static void evnt__free(struct Evnt *evnt)
633 12458 : {
634 12458 : if (evnt)
635 : {
636 12458 : Vstr_ref *sa = evnt->sa_ref;
637 12458 : Vstr_ref *acpt_sa = evnt->acpt_sa_ref;
638 12458 : Timer_q_node *tm_o = evnt->tm_o;
639 12458 : Timer_q_node *tm_l_r = evnt->tm_l_r;
640 12458 : Timer_q_node *tm_l_w = evnt->tm_l_w;
641 :
642 12458 : evnt__free1(evnt);
643 :
644 6229 : ASSERT(evnt__num >= 1); /* in case they come back in via. the cb */
645 12458 : --evnt__num;
646 6229 : ASSERT(evnt__num == evnt__debug_num_all());
647 :
648 12458 : evnt->cbs->cb_func_free(evnt);
649 12458 : evnt__free2(sa, acpt_sa, tm_o, tm_l_r, tm_l_w);
650 : }
651 12458 : }
652 :
653 : void evnt_free(struct Evnt *evnt)
654 12458 : {
655 12458 : if (evnt)
656 : {
657 12458 : evnt__del_whatever(evnt);
658 12458 : evnt__free(evnt);
659 : }
660 12458 : }
661 :
662 : static void evnt__close_now(struct Evnt *evnt)
663 12288 : {
664 12288 : if (evnt->flag_q_closed)
665 32 : return;
666 :
667 12256 : evnt_free(evnt);
668 : }
669 :
670 : static void evnt__uninit(struct Evnt *evnt)
671 0 : {
672 0 : ASSERT((evnt->flag_q_connect + evnt->flag_q_accept + evnt->flag_q_recv +
673 : evnt->flag_q_send_recv + evnt->flag_q_none) == 0);
674 :
675 0 : evnt__free1(evnt);
676 0 : evnt__free2(evnt->sa_ref, evnt->acpt_sa_ref,
677 : evnt->tm_o, evnt->tm_l_r, evnt->tm_l_w);
678 0 : }
679 :
680 : static void evnt__fd_close_noerrno(int fd)
681 0 : {
682 0 : int saved_errno = errno;
683 0 : close(fd);
684 0 : errno = saved_errno;
685 0 : }
686 :
687 : static int evnt__make_end(struct Evnt **que, struct Evnt *evnt, int flags)
688 12680 : {
689 12680 : evnt_add(que, evnt);
690 :
691 12680 : ++evnt__num;
692 :
693 12680 : evnt_wait_cntl_add(evnt, flags);
694 :
695 6340 : ASSERT(evnt__valid(evnt));
696 :
697 12680 : return (TRUE);
698 : }
699 :
700 : int evnt_make_con_ipv4(struct Evnt *evnt, const char *ipv4_string, short port)
701 0 : {
702 0 : int fd = -1;
703 0 : socklen_t alloc_len = sizeof(struct sockaddr_in);
704 0 : Vstr_ref *ref = NULL;
705 0 : struct sockaddr_in *saddr = NULL;
706 :
707 0 : if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
708 0 : goto sock_fail;
709 :
710 0 : EVNT__UPDATE_TV();
711 :
712 0 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
713 0 : goto init_fail;
714 0 : saddr = ref->ptr;
715 :
716 0 : saddr->sin_family = AF_INET;
717 0 : saddr->sin_port = htons(port);
718 0 : saddr->sin_addr.s_addr = inet_addr(ipv4_string);
719 :
720 0 : if (!evnt_init(evnt, fd, ref, NULL))
721 0 : goto init_fail;
722 0 : evnt->flag_q_pkt_move = TRUE;
723 :
724 0 : ASSERT(port && (saddr->sin_addr.s_addr != htonl(INADDR_ANY)));
725 :
726 0 : if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
727 : {
728 0 : if (errno == EINPROGRESS)
729 : { /* The connection needs more time....*/
730 0 : vstr_ref_del(ref);
731 0 : evnt->flag_q_connect = TRUE;
732 0 : return (evnt__make_end(&q_connect, evnt, POLLOUT));
733 : }
734 :
735 0 : goto connect_fail;
736 : }
737 :
738 0 : vstr_ref_del(ref);
739 0 : evnt->flag_q_none = TRUE;
740 0 : return (evnt__make_end(&q_none, evnt, 0));
741 :
742 0 : connect_fail:
743 0 : evnt__uninit(evnt);
744 0 : init_fail:
745 0 : vstr_ref_del(ref);
746 0 : evnt__fd_close_noerrno(fd);
747 0 : sock_fail:
748 0 : return (FALSE);
749 : }
750 :
751 : int evnt_make_con_local(struct Evnt *evnt, const char *fname)
752 232 : {
753 232 : int fd = -1;
754 232 : size_t len = strlen(fname) + 1;
755 : struct sockaddr_un tmp_sun;
756 232 : struct sockaddr_un *saddr = NULL;
757 232 : socklen_t alloc_len = 0;
758 232 : Vstr_ref *ref = NULL;
759 :
760 232 : tmp_sun.sun_path[0] = 0;
761 232 : alloc_len = SUN_LEN(&tmp_sun) + len;
762 :
763 232 : if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
764 0 : goto sock_fail;
765 :
766 232 : EVNT__UPDATE_TV();
767 :
768 232 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
769 0 : goto init_fail;
770 232 : saddr = ref->ptr;
771 :
772 232 : saddr->sun_family = AF_LOCAL;
773 232 : memcpy(saddr->sun_path, fname, len);
774 :
775 232 : if (!evnt_init(evnt, fd, ref, NULL))
776 0 : goto init_fail;
777 232 : evnt->flag_q_pkt_move = TRUE;
778 :
779 232 : if (connect(fd, EVNT_SA(evnt), alloc_len) == -1)
780 : {
781 0 : if (errno == EINPROGRESS)
782 : { /* The connection needs more time....*/
783 0 : vstr_ref_del(ref);
784 0 : evnt->flag_q_connect = TRUE;
785 0 : return (evnt__make_end(&q_connect, evnt, POLLOUT));
786 : }
787 :
788 0 : goto connect_fail;
789 : }
790 :
791 232 : vstr_ref_del(ref);
792 232 : evnt->flag_q_none = TRUE;
793 232 : return (evnt__make_end(&q_none, evnt, 0));
794 :
795 0 : connect_fail:
796 0 : evnt__uninit(evnt);
797 0 : init_fail:
798 0 : vstr_ref_del(ref);
799 0 : evnt__fd_close_noerrno(fd);
800 0 : sock_fail:
801 0 : return (FALSE);
802 : }
803 :
804 : int evnt_make_acpt_ref(struct Evnt *evnt, int fd, Vstr_ref *sa)
805 12284 : {
806 12284 : if (!evnt_init(evnt, fd, sa, NULL))
807 0 : return (FALSE);
808 :
809 12284 : evnt->flag_q_recv = TRUE;
810 12284 : return (evnt__make_end(&q_recv, evnt, POLLIN));
811 : }
812 :
813 : int evnt_make_acpt_dup(struct Evnt *evnt, int fd,
814 : struct sockaddr *sa, socklen_t len)
815 12052 : {
816 12052 : Vstr_ref *ref = vstr_ref_make_memdup(sa, len);
817 12052 : int ret = FALSE;
818 :
819 12052 : if (!ref)
820 : {
821 0 : errno = ENOMEM;
822 0 : return (FALSE);
823 : }
824 :
825 12052 : ret = evnt_make_acpt_ref(evnt, fd, ref);
826 12052 : vstr_ref_del(ref);
827 12052 : return (ret);
828 : }
829 :
830 : static int evnt__make_bind_end(struct Evnt *evnt)
831 160 : {
832 160 : vstr_free_base(evnt->io_r); evnt->io_r = NULL;
833 160 : vstr_free_base(evnt->io_w); evnt->io_w = NULL;
834 :
835 160 : evnt->flag_q_accept = TRUE;
836 160 : evnt__make_end(&q_accept, evnt, POLLIN);
837 160 : SOCKET_POLL_INDICATOR(evnt->ind)->revents = 0;
838 160 : return (TRUE);
839 : }
840 :
841 : int evnt_make_bind_ipv4(struct Evnt *evnt,
842 : const char *acpt_addr, short server_port,
843 : unsigned int listen_len, const char *cong)
844 120 : {
845 120 : int fd = -1;
846 120 : socklen_t alloc_len = sizeof(struct sockaddr_in);
847 120 : Vstr_ref *ref = NULL;
848 120 : struct sockaddr_in *saddr = NULL;
849 :
850 120 : if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
851 0 : VLG_WARN_GOTO(sock_fail, (vlg, "socket(): %m\n"));
852 :
853 120 : EVNT__UPDATE_TV();
854 :
855 120 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
856 0 : VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__));
857 120 : saddr = ref->ptr;
858 :
859 120 : saddr->sin_family = AF_INET;
860 :
861 120 : saddr->sin_addr.s_addr = htonl(INADDR_ANY);
862 120 : if (acpt_addr && *acpt_addr) /* silent error becomes <any> */
863 82 : EVNT__RESOLVE_NAME(saddr, acpt_addr);
864 120 : if (saddr->sin_addr.s_addr == htonl(INADDR_ANY))
865 38 : acpt_addr = "any";
866 :
867 120 : saddr->sin_port = htons(server_port);
868 :
869 120 : if (!evnt_init(evnt, fd, ref, NULL))
870 0 : VLG_WARNNOMEM_GOTO(init_fail, (vlg, "%s(): %m\n", __func__));
871 :
872 120 : if (!evnt_fd__set_reuse(fd, TRUE))
873 0 : VLG_WARNNOMEM_GOTO(reuse_fail, (vlg, "%s(): %m\n", __func__));
874 :
875 120 : if (cong && !evnt_fd__set_congestion(fd, cong) && (errno != ENOPROTOOPT))
876 0 : VLG_WARN_GOTO(cong_fail,
877 : (vlg, "setsockopt(TCP_CONGESTION, %s): %m\n", cong));
878 :
879 120 : if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
880 0 : VLG_WARN_GOTO(bind_fail,
881 : (vlg, "bind(%s:%hd): %m\n", acpt_addr, server_port));
882 :
883 120 : if (!server_port)
884 120 : if (getsockname(fd, EVNT_SA(evnt), &alloc_len) == -1)
885 0 : VLG_WARN_GOTO(getsockname_fail, (vlg, "getsockname(): %m\n"));
886 :
887 120 : if (listen(fd, listen_len) == -1)
888 0 : VLG_WARN_GOTO(listen_fail, (vlg, "listen(%d): %m\n", listen_len));
889 :
890 120 : vstr_ref_del(ref);
891 120 : return (evnt__make_bind_end(evnt));
892 :
893 0 : listen_fail:
894 0 : getsockname_fail:
895 0 : bind_fail:
896 0 : cong_fail:
897 0 : reuse_fail:
898 0 : evnt__uninit(evnt);
899 0 : init_fail:
900 0 : vstr_ref_del(ref);
901 0 : evnt__fd_close_noerrno(fd);
902 0 : sock_fail:
903 0 : return (FALSE);
904 : }
905 :
906 : int evnt_make_bind_local(struct Evnt *evnt, const char *fname,
907 : unsigned int listen_len)
908 40 : {
909 40 : int fd = -1;
910 40 : int saved_errno = 0;
911 40 : size_t len = strlen(fname) + 1;
912 : struct sockaddr_un tmp_sun;
913 40 : struct sockaddr_un *saddr = NULL;
914 40 : socklen_t alloc_len = 0;
915 40 : Vstr_ref *ref = NULL;
916 :
917 40 : tmp_sun.sun_path[0] = 0;
918 40 : alloc_len = SUN_LEN(&tmp_sun) + len;
919 :
920 40 : if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) == -1)
921 0 : goto sock_fail;
922 :
923 40 : EVNT__UPDATE_TV();
924 :
925 40 : if (!(ref = vstr_ref_make_malloc(alloc_len)))
926 0 : goto init_fail;
927 40 : saddr = ref->ptr;
928 :
929 40 : saddr->sun_family = AF_LOCAL;
930 40 : memcpy(saddr->sun_path, fname, len);
931 :
932 40 : if (!evnt_init(evnt, fd, ref, NULL))
933 0 : goto init_fail;
934 :
935 40 : if (unlink(fname) != -1)
936 0 : vlg_warn(vlg, "HAD to unlink(%s) for bind\n", fname);
937 :
938 40 : if (bind(fd, EVNT_SA(evnt), alloc_len) == -1)
939 0 : goto bind_fail;
940 :
941 40 : if (fchmod(fd, 0600) == -1)
942 0 : goto fchmod_fail;
943 :
944 40 : if (listen(fd, listen_len) == -1)
945 0 : goto listen_fail;
946 :
947 40 : vstr_ref_del(ref);
948 40 : return (evnt__make_bind_end(evnt));
949 :
950 0 : bind_fail:
951 0 : saved_errno = errno;
952 0 : vlg_warn(vlg, "bind(%s): %m\n", fname);
953 0 : errno = saved_errno;
954 0 : fchmod_fail:
955 0 : listen_fail:
956 0 : evnt__uninit(evnt);
957 0 : init_fail:
958 0 : vstr_ref_del(ref);
959 0 : evnt__fd_close_noerrno(fd);
960 0 : sock_fail:
961 0 : return (FALSE);
962 : }
963 :
964 : int evnt_make_custom(struct Evnt *evnt, int fd, Vstr_ref *sa, int flags)
965 4 : {
966 : static Vstr_ref dummy_sa = {vstr_ref_cb_free_nothing, NULL, 1};
967 :
968 4 : if (!sa)
969 4 : sa = &dummy_sa;
970 :
971 4 : EVNT__UPDATE_TV();
972 :
973 4 : if (!evnt_init(evnt, fd, sa, NULL))
974 : {
975 0 : evnt__fd_close_noerrno(fd);
976 0 : return (FALSE);
977 : }
978 :
979 4 : if (flags & POLLIN)
980 : {
981 4 : evnt->flag_q_recv = TRUE;
982 4 : return (evnt__make_end(&q_recv, evnt, POLLIN));
983 : }
984 :
985 0 : evnt->flag_q_none = TRUE;
986 0 : return (evnt__make_end(&q_none, evnt, 0));
987 : }
988 :
989 : void evnt_close(struct Evnt *evnt)
990 202 : {
991 202 : if (!evnt)
992 0 : return;
993 :
994 101 : ASSERT(evnt__valid(evnt));
995 :
996 202 : if (evnt->flag_q_closed)
997 0 : return;
998 :
999 : /* can't close at this point or we'll race with:
1000 : * socket_poll_add()/socket_poll_del() when using _DIRECT mapping */
1001 :
1002 : /* queue for deletion ... as the bt might still be using the ptr */
1003 202 : evnt->flag_q_closed = TRUE;
1004 202 : evnt->c_next = q_closed;
1005 202 : q_closed = evnt;
1006 :
1007 101 : ASSERT(evnt__valid(evnt));
1008 : }
1009 :
1010 : int evnt_limit_add(struct Evnt *evnt, Vstr_ref *ref)
1011 48208 : {
1012 48208 : Vstr_ref **refs = evnt->lims;
1013 48208 : unsigned int lim_num = evnt->lim_num + 1;
1014 48208 : struct Evnt_limit *lim = ref->ptr;
1015 :
1016 24104 : ASSERT((!evnt->lims && (lim_num == 1)) ||
1017 : ( evnt->lims && (lim_num >= 2)));
1018 :
1019 48208 : evnt->io_r_limited |= !!lim->io_r_max;
1020 48208 : evnt->io_w_limited |= !!lim->io_w_max;
1021 :
1022 48208 : if (lim_num == 1)
1023 : {
1024 12052 : if (!(evnt->lims = MK(sizeof(Vstr_ref *))))
1025 0 : return (FALSE);
1026 : }
1027 36156 : else if (!MV(evnt->lims, refs, sizeof(Vstr_ref *) * lim_num))
1028 0 : return (FALSE);
1029 :
1030 48208 : evnt->lims[evnt->lim_num] = vstr_ref_add(ref);
1031 48208 : evnt->lim_num = lim_num;
1032 :
1033 48208 : return (TRUE);
1034 : }
1035 :
1036 : int evnt_limit_dup(struct Evnt *evnt, const struct Evnt_limit *lim)
1037 24104 : {
1038 24104 : Vstr_ref *ref = vstr_ref_make_memdup(lim, sizeof(struct Evnt_limit));
1039 24104 : int ret = FALSE;
1040 :
1041 24104 : if (!ref)
1042 0 : return (FALSE);
1043 :
1044 24104 : ret = evnt_limit_add(evnt, ref);
1045 24104 : vstr_ref_del(ref);
1046 :
1047 24104 : return (ret);
1048 : }
1049 :
1050 : void evnt_limit_chg(struct Evnt *evnt, unsigned int scan, Vstr_ref *ref)
1051 45862 : {
1052 45862 : Vstr_ref *tmp = NULL;
1053 :
1054 22009 : ASSERT(evnt->lim_num > scan);
1055 45862 : tmp = evnt->lims[scan];
1056 :
1057 45862 : if (ref)
1058 : {
1059 45862 : struct Evnt_limit *lim = ref->ptr;
1060 :
1061 45862 : evnt->io_r_limited |= !!lim->io_r_max;
1062 45862 : evnt->io_w_limited |= !!lim->io_w_max;
1063 :
1064 45862 : ref = vstr_ref_add(ref);
1065 : }
1066 :
1067 45862 : evnt->lims[scan] = ref;
1068 45862 : vstr_ref_del(tmp);
1069 45862 : }
1070 :
1071 : void evnt_limit_alt(struct Evnt *evnt, unsigned int scan,
1072 : const struct Evnt_limit *nlim)
1073 45862 : {
1074 45862 : Vstr_ref *ref = NULL;
1075 45862 : struct Evnt_limit *lim = NULL;
1076 :
1077 22009 : ASSERT(evnt->lim_num > scan);
1078 45862 : ref = evnt->lims[scan];
1079 22009 : ASSERT(ref);
1080 22009 : ASSERT(ref->ref == 1); /* should be the only owner of this data,
1081 : * if we are changing it */
1082 :
1083 45862 : lim = ref->ptr;
1084 :
1085 45862 : evnt->io_r_limited |= !!lim->io_r_max;
1086 45862 : evnt->io_w_limited |= !!lim->io_w_max;
1087 :
1088 45862 : lim->io_r_max = nlim->io_r_max;
1089 45862 : lim->io_w_max = nlim->io_w_max;
1090 :
1091 45862 : if (lim->io_r_cur > lim->io_r_max)
1092 0 : lim->io_r_cur = lim->io_r_max;
1093 :
1094 45862 : if (lim->io_w_cur > lim->io_w_max)
1095 140 : lim->io_w_cur = lim->io_w_max;
1096 45862 : }
1097 :
1098 : void evnt_limit_free(struct Evnt *evnt)
1099 12458 : {
1100 12458 : unsigned int scan = 0;
1101 :
1102 73124 : while (scan < evnt->lim_num)
1103 48208 : vstr_ref_del(evnt->lims[scan++]);
1104 6229 : ASSERT(scan == evnt->lim_num);
1105 :
1106 12458 : F(evnt->lims);
1107 12458 : evnt->lims = NULL;
1108 12458 : evnt->lim_num = 0;
1109 12458 : }
1110 :
1111 : static void evnt__limit_tm_chk(const struct timeval *tv, struct timeval *ltv,
1112 : unsigned long *cur, unsigned long max)
1113 10413084 : {
1114 : struct timeval tv_end[1];
1115 : struct timeval tv_reset[1];
1116 :
1117 10413084 : *tv_end = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_end, 1, 0);
1118 10413084 : *tv_reset = *ltv; TIMER_Q_TIMEVAL_ADD_SECS(tv_reset, 3, 0);
1119 :
1120 10413084 : if (!ltv->tv_sec)
1121 : {
1122 4332 : *cur = max;
1123 4332 : return;
1124 : }
1125 :
1126 : /* if it's been more than a second ... reset */
1127 10408752 : if (TIMER_Q_TIMEVAL_CMP(tv, tv_end) > 0)
1128 : {
1129 7818 : if (TIMER_Q_TIMEVAL_CMP(tv, tv_reset) < 0)
1130 7814 : ++ltv->tv_sec; /* try and be nice */
1131 : else
1132 : { /* if it's been more than 3 seconds, just restart */
1133 4 : ltv->tv_sec = 0;
1134 4 : ltv->tv_usec = 0;
1135 : }
1136 :
1137 7818 : *cur = max;
1138 : }
1139 : }
1140 :
1141 : static void evnt__limit_chk(struct Evnt *evnt)
1142 5202579 : {
1143 : struct timeval tv[1];
1144 5202579 : unsigned int scan = 0;
1145 :
1146 5202579 : EVNT__COPY_TV(tv);
1147 :
1148 5202579 : evnt->io_r_limited = FALSE; /* blank and reset */
1149 5202579 : evnt->io_w_limited = FALSE;
1150 :
1151 31215474 : while (scan < evnt->lim_num)
1152 : {
1153 20810316 : if (evnt->lims[scan])
1154 : {
1155 20810316 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1156 :
1157 20810316 : evnt->io_r_limited |= !!lim->io_r_max;
1158 20810316 : evnt->io_w_limited |= !!lim->io_w_max;
1159 :
1160 20810316 : if (evnt->io_r_limited)
1161 : {
1162 0 : vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n",
1163 : lim->io_r_cur, lim->io_r_max,
1164 : lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec);
1165 0 : evnt__limit_tm_chk(tv, &lim->io_r_tm, &lim->io_r_cur, lim->io_r_max);
1166 0 : vlg_dbg2(vlg, "limit chk r beg (%'lu <= %'lu) @%lu:%lu\n",
1167 : lim->io_r_cur, lim->io_r_max,
1168 : lim->io_r_tm.tv_sec, lim->io_r_tm.tv_usec);
1169 : }
1170 :
1171 20810316 : if (evnt->io_w_limited)
1172 : {
1173 10413084 : vlg_dbg2(vlg, "limit chk w beg (%'lu <= %'lu) @%lu:%lu\n",
1174 : lim->io_w_cur, lim->io_w_max,
1175 : lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec);
1176 10413084 : evnt__limit_tm_chk(tv, &lim->io_w_tm, &lim->io_w_cur, lim->io_w_max);
1177 10413084 : vlg_dbg2(vlg, "limit chk w end (%'lu <= %'lu) @%lu:%lu\n",
1178 : lim->io_w_cur, lim->io_w_max,
1179 : lim->io_w_tm.tv_sec, lim->io_w_tm.tv_usec);
1180 : }
1181 : }
1182 :
1183 20810316 : ++scan;
1184 : }
1185 386502 : ASSERT(scan == evnt->lim_num);
1186 5202579 : }
1187 :
1188 : static struct Evnt_limit *evnt_limit_r(struct Evnt *evnt, unsigned long sz)
1189 51859 : {
1190 51859 : struct Evnt_limit *ret = NULL;
1191 51859 : unsigned int scan = 0;
1192 :
1193 51859 : if (!evnt->io_r_limited)
1194 51859 : return (NULL);
1195 :
1196 0 : evnt__limit_chk(evnt);
1197 :
1198 0 : while (scan < evnt->lim_num)
1199 : {
1200 0 : if (evnt->lims[scan])
1201 : {
1202 0 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1203 :
1204 0 : if (lim->io_r_max && (!ret || (ret->io_r_cur > lim->io_r_cur)))
1205 0 : ret = lim;
1206 : }
1207 :
1208 0 : ++scan;
1209 : }
1210 0 : ASSERT(scan == evnt->lim_num);
1211 :
1212 0 : if (ret)
1213 0 : vlg_dbg2(vlg, "limit io_r(%'lu <= %'lu)\n", ret->io_r_cur, sz);
1214 :
1215 0 : if (ret && (ret->io_r_cur > sz))
1216 0 : ret = NULL;
1217 :
1218 0 : return (ret);
1219 : }
1220 :
1221 : static struct Evnt_limit *evnt_limit_w(struct Evnt *evnt, unsigned long sz)
1222 17824975 : {
1223 17824975 : struct Evnt_limit *ret = NULL;
1224 17824975 : unsigned int scan = 0;
1225 :
1226 17824975 : if (!evnt->io_w_limited)
1227 12622396 : return (NULL);
1228 :
1229 5202579 : evnt__limit_chk(evnt);
1230 :
1231 31215474 : while (scan < evnt->lim_num)
1232 : {
1233 20810316 : if (evnt->lims[scan])
1234 : {
1235 20810316 : struct Evnt_limit *lim = evnt->lims[scan]->ptr;
1236 :
1237 20810316 : if (lim->io_w_max && (!ret || (ret->io_w_cur > lim->io_w_cur)))
1238 5205387 : ret = lim;
1239 : }
1240 :
1241 20810316 : ++scan;
1242 : }
1243 386502 : ASSERT(scan == evnt->lim_num);
1244 :
1245 5202579 : if (ret)
1246 5202579 : vlg_dbg2(vlg, "limit io_w(%'lu <= %'lu)\n", ret->io_w_cur, sz);
1247 :
1248 5202579 : if (ret && (ret->io_w_cur > sz))
1249 5198875 : ret = NULL;
1250 :
1251 5202579 : return (ret);
1252 : }
1253 :
1254 : static Timer_q_base *evnt__timeout_lim_r_1 = NULL; /* read/recv/accept */
1255 : static Timer_q_base *evnt__timeout_lim_r_10 = NULL;
1256 : static Timer_q_base *evnt__timeout_lim_w_1 = NULL; /* write/send/connect */
1257 : static Timer_q_base *evnt__timeout_lim_w_10 = NULL;
1258 :
1259 : static void evnt__timer_cb_lim_r(int type, void *data)
1260 0 : {
1261 0 : struct Evnt *evnt = data;
1262 :
1263 0 : if (!evnt) /* deleted */
1264 0 : return;
1265 :
1266 0 : ASSERT(evnt__valid(evnt));
1267 :
1268 0 : if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1269 0 : return;
1270 :
1271 0 : evnt->tm_l_r = NULL;
1272 0 : evnt_wait_cntl_add(evnt, POLLIN);
1273 :
1274 0 : if (type == TIMER_Q_TYPE_CALL_DEL)
1275 0 : return;
1276 :
1277 0 : if (evnt->flag_q_accept)
1278 : {
1279 0 : assert(FALSE);
1280 : }
1281 0 : else if (!evnt->cbs->cb_func_recv(evnt))
1282 0 : evnt__close_now(evnt);
1283 : }
1284 :
1285 : static int evnt_limit_timeout_r(struct Evnt *evnt, struct Evnt_limit *lim)
1286 51859 : {
1287 : struct timeval tv[1];
1288 51859 : unsigned long msecs = 0;
1289 :
1290 51859 : if (evnt->tm_l_r || !lim || lim->io_r_cur)
1291 51859 : return (TRUE);
1292 :
1293 0 : EVNT__COPY_TV(tv);
1294 0 : msecs = timer_q_timeval_udiff_msecs(tv, &lim->io_r_tm);
1295 :
1296 0 : if (msecs < 1000)
1297 0 : msecs = 1000 - msecs;
1298 : else
1299 0 : msecs = 0;
1300 0 : vlg_dbg2(vlg, "timeout_r_make($<sa:%p>, %lu)\n", EVNT_SA(evnt), msecs);
1301 :
1302 0 : if (msecs <= 100)
1303 : {
1304 0 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 0, 100 * 1000);
1305 0 : evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_1, evnt, tv,
1306 : TIMER_Q_FLAG_NODE_DEFAULT);
1307 : }
1308 : else
1309 : {
1310 0 : TIMER_Q_TIMEVAL_ADD_SECS(tv, 1, 0);
1311 0 : evnt->tm_l_r = timer_q_add_node(evnt__timeout_lim_r_10, evnt, tv,
1312 : TIMER_Q_FLAG_NODE_DEFAULT);
1313 : }
1314 :
1315 0 : evnt_wait_cntl_del(evnt, POLLIN);
1316 :
1317 0 : return (!!evnt->tm_l_r);
1318 : }
1319 :
1320 : static void evnt__timer_cb_lim_w(int type, void *data)
1321 3406 : {
1322 3406 : struct Evnt *evnt = data;
1323 :
1324 3406 : if (!evnt) /* deleted */
1325 0 : return;
1326 :
1327 1808 : ASSERT(evnt__valid(evnt));
1328 :
1329 3406 : if (type == TIMER_Q_TYPE_CALL_RUN_ALL)
1330 0 : return;
1331 :
1332 3406 : evnt->tm_l_w = NULL;
1333 3406 : if (evnt->flag_q_send_recv)
1334 : {
1335 3001 : evnt_wait_cntl_add(evnt, POLLOUT);
1336 3001 : SOCKET_POLL_INDICATOR(evnt->ind)->revents |= POLLOUT;
1337 : }
1338 :
1339 3406 : if (type == TIMER_Q_TYPE_CALL_DEL)
1340 0 : return;
1341 |