tevent: rewrite/simplify tevent_poll and maintain ev->fd_events correctly
[nivanova/samba-autobuild/.git] / lib / tevent / tevent_poll.c
index 867d951ee1b1bcee2e35e503501d31d37ad2713e..282f3cf3082c32dfb0ba1359289554a34d242940 100644 (file)
@@ -33,15 +33,6 @@ struct poll_event_context {
        /* a pointer back to the generic event_context */
        struct tevent_context *ev;
 
-       /*
-        * A DLIST for fresh fde's added by poll_event_add_fd but not
-        * picked up yet by poll_event_loop_once
-        */
-       struct tevent_fd *fresh;
-       /*
-        * A DLIST for disabled fde's.
-        */
-       struct tevent_fd *disabled;
        /*
         * one or more events were deleted or disabled
         */
@@ -49,56 +40,26 @@ struct poll_event_context {
 
        /*
         * These two arrays are maintained together.
+        *
+        * The following is always true:
+        * num_fds <= num_fdes
+        *
+        * new 'fresh' elements are added at the end
+        * of the 'fdes' array and picked up later
+        * to the 'fds' array in poll_event_sync_arrays()
+        * before the poll() syscall.
         */
        struct pollfd *fds;
+       size_t num_fds;
        struct tevent_fd **fdes;
-       unsigned num_fds;
+       size_t num_fdes;
 
        /*
-        * Signal fd to wake the poll() thread
+        * use tevent_common_wakeup(ev) to wake the poll() thread
         */
-       int signal_fd;
-
-       /* information for exiting from the event loop */
-       int exit_code;
+       bool use_mt_mode;
 };
 
-static int poll_event_context_destructor(struct poll_event_context *poll_ev)
-{
-       struct tevent_fd *fd, *fn;
-
-       for (fd = poll_ev->fresh; fd; fd = fn) {
-               fn = fd->next;
-               fd->event_ctx = NULL;
-               DLIST_REMOVE(poll_ev->fresh, fd);
-       }
-
-       for (fd = poll_ev->disabled; fd; fd = fn) {
-               fn = fd->next;
-               fd->event_ctx = NULL;
-               DLIST_REMOVE(poll_ev->disabled, fd);
-       }
-
-       if (poll_ev->signal_fd == -1) {
-               /*
-                * Non-threaded, no signal pipe
-                */
-               return 0;
-       }
-
-       close(poll_ev->signal_fd);
-       poll_ev->signal_fd = -1;
-
-       if (poll_ev->num_fds == 0) {
-               return 0;
-       }
-       if (poll_ev->fds[0].fd != -1) {
-               close(poll_ev->fds[0].fd);
-               poll_ev->fds[0].fd = -1;
-       }
-       return 0;
-}
-
 /*
   create a poll_event_context structure.
 */
@@ -119,30 +80,13 @@ static int poll_event_context_init(struct tevent_context *ev)
                return -1;
        }
        poll_ev->ev = ev;
-       poll_ev->signal_fd = -1;
        ev->additional_data = poll_ev;
-       talloc_set_destructor(poll_ev, poll_event_context_destructor);
        return 0;
 }
 
-static bool set_nonblock(int fd)
-{
-       int val;
-
-       val = fcntl(fd, F_GETFL, 0);
-       if (val == -1) {
-               return false;
-       }
-       val |= O_NONBLOCK;
-
-       return (fcntl(fd, F_SETFL, val) != -1);
-}
-
 static int poll_event_context_init_mt(struct tevent_context *ev)
 {
        struct poll_event_context *poll_ev;
-       struct pollfd *pfd;
-       int fds[2];
        int ret;
 
        ret = poll_event_context_init(ev);
@@ -153,67 +97,22 @@ static int poll_event_context_init_mt(struct tevent_context *ev)
        poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
 
-       poll_ev->fds = talloc_zero(poll_ev, struct pollfd);
-       if (poll_ev->fds == NULL) {
-               return -1;
-       }
-
-       ret = pipe(fds);
-       if (ret == -1) {
-               return -1;
-       }
-
-       if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) {
-               close(fds[0]);
-               close(fds[1]);
-               return -1;
+       ret = tevent_common_wakeup_init(ev);
+       if (ret != 0) {
+               return ret;
        }
 
-       poll_ev->signal_fd = fds[1];
-
-       pfd = &poll_ev->fds[0];
-       pfd->fd = fds[0];
-       pfd->events = (POLLIN|POLLHUP);
-
-       poll_ev->num_fds = 1;
-
-       talloc_set_destructor(poll_ev, poll_event_context_destructor);
+       poll_ev->use_mt_mode = true;
 
        return 0;
 }
 
 static void poll_event_wake_pollthread(struct poll_event_context *poll_ev)
 {
-       char c;
-       ssize_t ret;
-
-       if (poll_ev->signal_fd == -1) {
-               return;
-       }
-       c = 0;
-       do {
-               ret = write(poll_ev->signal_fd, &c, sizeof(c));
-       } while ((ret == -1) && (errno == EINTR));
-}
-
-static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev)
-{
-       char buf[16];
-       ssize_t ret;
-       int fd;
-
-       if (poll_ev->signal_fd == -1) {
+       if (!poll_ev->use_mt_mode) {
                return;
        }
-
-       if (poll_ev->num_fds < 1) {
-               return;
-       }
-       fd = poll_ev->fds[0].fd;
-
-       do {
-               ret = read(fd, buf, sizeof(buf));
-       } while (ret == sizeof(buf));
+       tevent_common_wakeup(poll_ev->ev);
 }
 
 /*
@@ -233,10 +132,6 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
                ev->additional_data, struct poll_event_context);
 
        if (del_idx == UINT64_MAX) {
-               struct tevent_fd **listp =
-                       (struct tevent_fd **)fde->additional_data;
-
-               DLIST_REMOVE((*listp), fde);
                goto done;
        }
 
@@ -266,24 +161,50 @@ static void poll_event_schedule_immediate(struct tevent_immediate *im,
   Private function called by "standard" backend fallback.
   Note this only allows fallback to "poll" backend, not "poll-mt".
 */
-_PRIVATE_ void tevent_poll_event_add_fd_internal(struct tevent_context *ev,
+_PRIVATE_ bool tevent_poll_event_add_fd_internal(struct tevent_context *ev,
                                                 struct tevent_fd *fde)
 {
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
-       struct tevent_fd **listp;
+       uint64_t fde_idx = UINT64_MAX;
+       size_t num_fdes;
+
+       fde->additional_flags = UINT64_MAX;
+       talloc_set_destructor(fde, poll_event_fd_destructor);
 
-       if (fde->flags != 0) {
-               listp = &poll_ev->fresh;
-       } else {
-               listp = &poll_ev->disabled;
+       if (fde->flags == 0) {
+               /*
+                * Nothing more to do...
+                */
+               return true;
        }
 
-       fde->additional_flags   = UINT64_MAX;
-       fde->additional_data    = listp;
+       /*
+        * We need to add it to the end of the 'fdes' array.
+        */
+       num_fdes = poll_ev->num_fdes + 1;
+       if (num_fdes > talloc_array_length(poll_ev->fdes)) {
+               struct tevent_fd **tmp_fdes = NULL;
+               size_t array_length;
 
-       DLIST_ADD((*listp), fde);
-       talloc_set_destructor(fde, poll_event_fd_destructor);
+               array_length = (num_fdes + 15) & ~15; /* round up to 16 */
+
+               tmp_fdes = talloc_realloc(poll_ev,
+                                         poll_ev->fdes,
+                                         struct tevent_fd *,
+                                         array_length);
+               if (tmp_fdes == NULL) {
+                       return false;
+               }
+               poll_ev->fdes = tmp_fdes;
+       }
+
+       fde_idx = poll_ev->num_fdes;
+       fde->additional_flags = fde_idx;
+       poll_ev->fdes[fde_idx] = fde;
+       poll_ev->num_fdes++;
+
+       return true;
 }
 
 /*
@@ -301,27 +222,29 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
        struct tevent_fd *fde;
+       bool ok;
 
        if (fd < 0) {
                return NULL;
        }
 
-       fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd);
+       fde = tevent_common_add_fd(ev,
+                                  mem_ctx,
+                                  fd,
+                                  flags,
+                                  handler,
+                                  private_data,
+                                  handler_name,
+                                  location);
        if (fde == NULL) {
                return NULL;
        }
-       fde->event_ctx          = ev;
-       fde->fd                 = fd;
-       fde->flags              = flags;
-       fde->handler            = handler;
-       fde->close_fn           = NULL;
-       fde->private_data       = private_data;
-       fde->handler_name       = handler_name;
-       fde->location           = location;
-       fde->additional_flags   = UINT64_MAX;
-       fde->additional_data    = NULL;
-
-       tevent_poll_event_add_fd_internal(ev, fde);
+
+       ok = tevent_poll_event_add_fd_internal(ev, fde);
+       if (!ok) {
+               TALLOC_FREE(fde);
+               return NULL;
+       }
        poll_event_wake_pollthread(poll_ev);
 
        /*
@@ -344,19 +267,20 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        if (ev == NULL) {
                return;
        }
+
+       if (fde->flags == flags) {
+               return;
+       }
+
        poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
 
        fde->flags = flags;
 
        if (idx == UINT64_MAX) {
-               struct tevent_fd **listp =
-                       (struct tevent_fd **)fde->additional_data;
-
                /*
                 * We move it between the fresh and disabled lists.
                 */
-               DLIST_REMOVE((*listp), fde);
                tevent_poll_event_add_fd_internal(ev, fde);
                poll_event_wake_pollthread(poll_ev);
                return;
@@ -369,8 +293,16 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
                 */
                poll_ev->fdes[idx] = NULL;
                poll_ev->deleted = true;
-               DLIST_REMOVE(ev->fd_events, fde);
-               tevent_poll_event_add_fd_internal(ev, fde);
+               fde->additional_flags = UINT64_MAX;
+               poll_event_wake_pollthread(poll_ev);
+               return;
+       }
+
+       if (idx >= poll_ev->num_fds) {
+               /*
+                * Not yet added to the
+                * poll_ev->fds array.
+                */
                poll_event_wake_pollthread(poll_ev);
                return;
        }
@@ -388,18 +320,18 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        poll_event_wake_pollthread(poll_ev);
 }
 
-static bool poll_event_setup_fresh(struct tevent_context *ev,
+static bool poll_event_sync_arrays(struct tevent_context *ev,
                                   struct poll_event_context *poll_ev)
 {
-       struct tevent_fd *fde, *next;
-       unsigned num_fresh, num_fds;
+       size_t i;
+       size_t array_length;
 
        if (poll_ev->deleted) {
-               unsigned first_fd = (poll_ev->signal_fd != -1) ? 1 : 0;
-               unsigned i;
 
-               for (i=first_fd; i < poll_ev->num_fds;) {
-                       fde = poll_ev->fdes[i];
+               for (i=0; i < poll_ev->num_fds;) {
+                       struct tevent_fd *fde = poll_ev->fdes[i];
+                       size_t ci;
+
                        if (fde != NULL) {
                                i++;
                                continue;
@@ -410,61 +342,63 @@ static bool poll_event_setup_fresh(struct tevent_context *ev,
                         * from the arrays
                         */
                        poll_ev->num_fds -= 1;
-                       if (poll_ev->num_fds == i) {
-                               break;
-                       }
-                       poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds];
-                       poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds];
-                       if (poll_ev->fdes[i] != NULL) {
-                               poll_ev->fdes[i]->additional_flags = i;
+                       ci = poll_ev->num_fds;
+                       if (ci > i) {
+                               poll_ev->fds[i] = poll_ev->fds[ci];
+                               poll_ev->fdes[i] = poll_ev->fdes[ci];
+                               if (poll_ev->fdes[i] != NULL) {
+                                       poll_ev->fdes[i]->additional_flags = i;
+                               }
                        }
+                       poll_ev->fds[ci] = (struct pollfd) { .fd = -1 };
+                       poll_ev->fdes[ci] = NULL;
                }
+               poll_ev->deleted = false;
        }
-       poll_ev->deleted = false;
 
-       if (poll_ev->fresh == NULL) {
+       if (poll_ev->num_fds == poll_ev->num_fdes) {
                return true;
        }
 
-       num_fresh = 0;
-       for (fde = poll_ev->fresh; fde; fde = fde->next) {
-               num_fresh += 1;
-       }
-       num_fds = poll_ev->num_fds + num_fresh;
-
        /*
-        * We check the length of fdes here. It is the last one
-        * enlarged, so if the realloc for poll_fd->fdes fails,
-        * poll_fd->fds will have at least the size of poll_fd->fdes
+        * Recheck the size of both arrays and make sure
+        * poll_fd->fds array has at least the size of the
+        * in use poll_ev->fdes array.
         */
+       if (poll_ev->num_fdes > talloc_array_length(poll_ev->fds)) {
+               struct pollfd *tmp_fds = NULL;
 
-       if (num_fds >= talloc_array_length(poll_ev->fdes)) {
-               struct pollfd *tmp_fds;
-               struct tevent_fd **tmp_fdes;
-               unsigned array_length;
-
-               array_length = (num_fds + 15) & ~15; /* round up to 16 */
+               /*
+                * Make sure both allocated the same length.
+                */
+               array_length = talloc_array_length(poll_ev->fdes);
 
-               tmp_fds = talloc_realloc(
-                       poll_ev, poll_ev->fds, struct pollfd, array_length);
+               tmp_fds = talloc_realloc(poll_ev,
+                                        poll_ev->fds,
+                                        struct pollfd,
+                                        array_length);
                if (tmp_fds == NULL) {
                        return false;
                }
                poll_ev->fds = tmp_fds;
-
-               tmp_fdes = talloc_realloc(
-                       poll_ev, poll_ev->fdes, struct tevent_fd *,
-                       array_length);
-               if (tmp_fdes == NULL) {
-                       return false;
-               }
-               poll_ev->fdes = tmp_fdes;
        }
 
-       for (fde = poll_ev->fresh; fde; fde = next) {
-               struct pollfd *pfd;
+       /*
+        * Now setup the new elements.
+        */
+       for (i = poll_ev->num_fds; i < poll_ev->num_fdes; i++) {
+               struct tevent_fd *fde = poll_ev->fdes[i];
+               struct pollfd *pfd = &poll_ev->fds[poll_ev->num_fds];
 
-               pfd = &poll_ev->fds[poll_ev->num_fds];
+               if (fde == NULL) {
+                       continue;
+               }
+
+               if (i > poll_ev->num_fds) {
+                       poll_ev->fdes[poll_ev->num_fds] = fde;
+                       fde->additional_flags = poll_ev->num_fds;
+                       poll_ev->fdes[i] = NULL;
+               }
 
                pfd->fd = fde->fd;
                pfd->events = 0;
@@ -477,15 +411,41 @@ static bool poll_event_setup_fresh(struct tevent_context *ev,
                        pfd->events |= (POLLOUT);
                }
 
-               fde->additional_flags = poll_ev->num_fds;
-               poll_ev->fdes[poll_ev->num_fds] = fde;
+               poll_ev->num_fds += 1;
+       }
+       /* Both are in sync again */
+       poll_ev->num_fdes = poll_ev->num_fds;
+
+       /*
+        * Check if we should shrink the arrays
+        * But keep at least 16 elements.
+        */
 
-               next = fde->next;
-               DLIST_REMOVE(poll_ev->fresh, fde);
-               DLIST_ADD(ev->fd_events, fde);
+       array_length = (poll_ev->num_fds + 15) & ~15; /* round up to 16 */
+       array_length = MAX(array_length, 16);
+       if (array_length < talloc_array_length(poll_ev->fdes)) {
+               struct tevent_fd **tmp_fdes = NULL;
+               struct pollfd *tmp_fds = NULL;
 
-               poll_ev->num_fds += 1;
+               tmp_fdes = talloc_realloc(poll_ev,
+                                         poll_ev->fdes,
+                                         struct tevent_fd *,
+                                         array_length);
+               if (tmp_fdes == NULL) {
+                       return false;
+               }
+               poll_ev->fdes = tmp_fdes;
+
+               tmp_fds = talloc_realloc(poll_ev,
+                                        poll_ev->fds,
+                                        struct pollfd,
+                                        array_length);
+               if (tmp_fds == NULL) {
+                       return false;
+               }
+               poll_ev->fds = tmp_fds;
        }
+
        return true;
 }
 
@@ -501,7 +461,9 @@ static int poll_event_loop_poll(struct tevent_context *ev,
        int timeout = -1;
        int poll_errno;
        struct tevent_fd *fde = NULL;
+       struct tevent_fd *next = NULL;
        unsigned i;
+       bool ok;
 
        if (ev->signal_events && tevent_common_check_signal(ev)) {
                return 0;
@@ -512,9 +474,8 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                timeout += (tvalp->tv_usec + 999) / 1000;
        }
 
-       poll_event_drain_signal_fd(poll_ev);
-
-       if (!poll_event_setup_fresh(ev, poll_ev)) {
+       ok = poll_event_sync_arrays(ev, poll_ev);
+       if (!ok) {
                return -1;
        }
 
@@ -545,11 +506,13 @@ static int poll_event_loop_poll(struct tevent_context *ev,
           which ones and call the handler, being careful to allow
           the handler to remove itself when called */
 
-       for (fde = ev->fd_events; fde; fde = fde->next) {
-               unsigned idx = fde->additional_flags;
+       for (fde = ev->fd_events; fde; fde = next) {
+               uint64_t idx = fde->additional_flags;
                struct pollfd *pfd;
                uint16_t flags = 0;
 
+               next = fde->next;
+
                if (idx == UINT64_MAX) {
                        continue;
                }
@@ -594,7 +557,14 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                if (pfd->revents & POLLOUT) {
                        flags |= TEVENT_FD_WRITE;
                }
+               /*
+                * Note that fde->flags could be changed when using
+                * the poll_mt backend together with threads,
+                * that why we need to check pfd->revents and fde->flags
+                */
+               flags &= fde->flags;
                if (flags != 0) {
+                       DLIST_DEMOTE(ev->fd_events, fde);
                        fde->handler(ev, fde, flags, fde->private_data);
                        return 0;
                }
@@ -638,6 +608,10 @@ static int poll_event_loop_once(struct tevent_context *ev,
                return 0;
        }
 
+       if (ev->threaded_contexts != NULL) {
+               tevent_common_threaded_activate_immediate(ev);
+       }
+
        if (ev->immediate_events &&
            tevent_common_loop_immediate(ev)) {
                return 0;
@@ -651,47 +625,17 @@ static int poll_event_loop_once(struct tevent_context *ev,
        return poll_event_loop_poll(ev, &tval);
 }
 
-static int poll_event_loop_wait(struct tevent_context *ev,
-                               const char *location)
-{
-       struct poll_event_context *poll_ev = talloc_get_type_abort(
-               ev->additional_data, struct poll_event_context);
-
-       /*
-        * loop as long as we have events pending
-        */
-       while (ev->fd_events ||
-              ev->timer_events ||
-              ev->immediate_events ||
-              ev->signal_events ||
-              poll_ev->fresh ||
-              poll_ev->disabled) {
-               int ret;
-               ret = _tevent_loop_once(ev, location);
-               if (ret != 0) {
-                       tevent_debug(ev, TEVENT_DEBUG_FATAL,
-                                    "_tevent_loop_once() failed: %d - %s\n",
-                                    ret, strerror(errno));
-                       return ret;
-               }
-       }
-
-       tevent_debug(ev, TEVENT_DEBUG_WARNING,
-                    "poll_event_loop_wait() out of events\n");
-       return 0;
-}
-
 static const struct tevent_ops poll_event_ops = {
        .context_init           = poll_event_context_init,
        .add_fd                 = poll_event_add_fd,
        .set_fd_close_fn        = tevent_common_fd_set_close_fn,
        .get_fd_flags           = tevent_common_fd_get_flags,
        .set_fd_flags           = poll_event_set_fd_flags,
-       .add_timer              = tevent_common_add_timer,
+       .add_timer              = tevent_common_add_timer_v2,
        .schedule_immediate     = tevent_common_schedule_immediate,
        .add_signal             = tevent_common_add_signal,
        .loop_once              = poll_event_loop_once,
-       .loop_wait              = poll_event_loop_wait,
+       .loop_wait              = tevent_common_loop_wait,
 };
 
 _PRIVATE_ bool tevent_poll_init(void)
@@ -705,11 +649,11 @@ static const struct tevent_ops poll_event_mt_ops = {
        .set_fd_close_fn        = tevent_common_fd_set_close_fn,
        .get_fd_flags           = tevent_common_fd_get_flags,
        .set_fd_flags           = poll_event_set_fd_flags,
-       .add_timer              = tevent_common_add_timer,
+       .add_timer              = tevent_common_add_timer_v2,
        .schedule_immediate     = poll_event_schedule_immediate,
        .add_signal             = tevent_common_add_signal,
        .loop_once              = poll_event_loop_once,
-       .loop_wait              = poll_event_loop_wait,
+       .loop_wait              = tevent_common_loop_wait,
 };
 
 _PRIVATE_ bool tevent_poll_mt_init(void)