tevent: Add an internal function tevent_epoll_set_panic_fallback().
[sfrench/samba-autobuild/.git] / lib / tevent / tevent_poll.c
index 2e202aa7ec266fe9bc645a15dd603c9f57676992..5479f2f7cd67a1e1fd0e2ea9b0ed2442118d0d9b 100644 (file)
@@ -33,41 +33,182 @@ struct poll_event_context {
        /* a pointer back to the generic event_context */
        struct tevent_context *ev;
 
+       /*
+        * A DLIST for fresh fde's added by poll_event_add_fd but not
+        * picked up yet by poll_event_loop_once
+        */
+       struct tevent_fd *fresh;
+
        /*
         * These two arrays are maintained together.
         */
        struct pollfd *fds;
-       struct tevent_fd **fd_events;
-       uint64_t num_fds;
+       struct tevent_fd **fdes;
+       unsigned num_fds;
+
+       /*
+        * Signal fd to wake the poll() thread
+        */
+       int signal_fd;
 
        /* information for exiting from the event loop */
        int exit_code;
 };
 
+static int poll_event_context_destructor(struct poll_event_context *poll_ev)
+{
+       struct tevent_fd *fd, *fn;
+
+       for (fd = poll_ev->fresh; fd; fd = fn) {
+               fn = fd->next;
+               fd->event_ctx = NULL;
+               DLIST_REMOVE(poll_ev->fresh, fd);
+       }
+
+       if (poll_ev->signal_fd == -1) {
+               /*
+                * Non-threaded, no signal pipe
+                */
+               return 0;
+       }
+
+       close(poll_ev->signal_fd);
+       poll_ev->signal_fd = -1;
+
+       if (poll_ev->num_fds == 0) {
+               return 0;
+       }
+       if (poll_ev->fds[0].fd != -1) {
+               close(poll_ev->fds[0].fd);
+               poll_ev->fds[0].fd = -1;
+       }
+       return 0;
+}
+
 /*
-  create a select_event_context structure.
+  create a poll_event_context structure.
 */
 static int poll_event_context_init(struct tevent_context *ev)
 {
        struct poll_event_context *poll_ev;
 
+       /*
+        * we might be called during tevent_re_initialise()
+        * which means we need to free our old additional_data
+        * in order to detach old fd events from the
+        * poll_ev->fresh list
+        */
+       TALLOC_FREE(ev->additional_data);
+
        poll_ev = talloc_zero(ev, struct poll_event_context);
        if (poll_ev == NULL) {
                return -1;
        }
        poll_ev->ev = ev;
+       poll_ev->signal_fd = -1;
        ev->additional_data = poll_ev;
+       talloc_set_destructor(poll_ev, poll_event_context_destructor);
+       return 0;
+}
+
+static bool set_nonblock(int fd)
+{
+       int val;
+
+       val = fcntl(fd, F_GETFL, 0);
+       if (val == -1) {
+               return false;
+       }
+       val |= O_NONBLOCK;
+
+       return (fcntl(fd, F_SETFL, val) != -1);
+}
+
+static int poll_event_context_init_mt(struct tevent_context *ev)
+{
+       struct poll_event_context *poll_ev;
+       struct pollfd *pfd;
+       int fds[2];
+       int ret;
+
+       ret = poll_event_context_init(ev);
+       if (ret == -1) {
+               return ret;
+       }
+
+       poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       poll_ev->fds = talloc_zero(poll_ev, struct pollfd);
+       if (poll_ev->fds == NULL) {
+               return -1;
+       }
+
+       ret = pipe(fds);
+       if (ret == -1) {
+               return -1;
+       }
+
+       if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) {
+               close(fds[0]);
+               close(fds[1]);
+               return -1;
+       }
+
+       poll_ev->signal_fd = fds[1];
+
+       pfd = &poll_ev->fds[0];
+       pfd->fd = fds[0];
+       pfd->events = (POLLIN|POLLHUP);
+
+       poll_ev->num_fds = 1;
+
+       talloc_set_destructor(poll_ev, poll_event_context_destructor);
+
        return 0;
 }
 
+static void poll_event_wake_pollthread(struct poll_event_context *poll_ev)
+{
+       char c;
+       ssize_t ret;
+
+       if (poll_ev->signal_fd == -1) {
+               return;
+       }
+       c = 0;
+       do {
+               ret = write(poll_ev->signal_fd, &c, sizeof(c));
+       } while ((ret == -1) && (errno == EINTR));
+}
+
+static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev)
+{
+       char buf[16];
+       ssize_t ret;
+       int fd;
+
+       if (poll_ev->signal_fd == -1) {
+               return;
+       }
+
+       if (poll_ev->num_fds < 1) {
+               return;
+       }
+       fd = poll_ev->fds[0].fd;
+
+       do {
+               ret = read(fd, buf, sizeof(buf));
+       } while (ret == sizeof(buf));
+}
+
 /*
   destroy an fd_event
 */
 static int poll_event_fd_destructor(struct tevent_fd *fde)
 {
        struct tevent_context *ev = fde->event_ctx;
-       struct poll_event_context *poll_ev = NULL;
-       struct tevent_fd *moved_fde;
+       struct poll_event_context *poll_ev;
        uint64_t del_idx = fde->additional_flags;
 
        if (ev == NULL) {
@@ -77,16 +218,43 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
        poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
 
-       moved_fde = poll_ev->fd_events[poll_ev->num_fds-1];
-       poll_ev->fd_events[del_idx] = moved_fde;
-       poll_ev->fds[del_idx] = poll_ev->fds[poll_ev->num_fds-1];
-       moved_fde->additional_flags = del_idx;
+       poll_ev->fdes[del_idx] = NULL;
+       poll_event_wake_pollthread(poll_ev);
+done:
+       return tevent_common_fd_destructor(fde);
+}
+
+static int poll_fresh_fde_destructor(struct tevent_fd *fde)
+{
+       struct tevent_context *ev = fde->event_ctx;
+       struct poll_event_context *poll_ev;
+
+       if (ev == NULL) {
+               goto done;
+       }
+       poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
 
-       poll_ev->num_fds -= 1;
+       DLIST_REMOVE(poll_ev->fresh, fde);
 done:
        return tevent_common_fd_destructor(fde);
 }
 
+static void poll_event_schedule_immediate(struct tevent_immediate *im,
+                                         struct tevent_context *ev,
+                                         tevent_immediate_handler_t handler,
+                                         void *private_data,
+                                         const char *handler_name,
+                                         const char *location)
+{
+       struct poll_event_context *poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       tevent_common_schedule_immediate(im, ev, handler, private_data,
+                                        handler_name, location);
+       poll_event_wake_pollthread(poll_ev);
+}
+
 /*
   add a fd based event
   return NULL on failure (memory allocation error)
@@ -101,60 +269,35 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
 {
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
-       struct pollfd *pfd;
        struct tevent_fd *fde;
 
-       fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
-                                  handler, private_data,
-                                  handler_name, location);
-       if (fde == NULL) {
+       if (fd < 0) {
                return NULL;
        }
 
-       /* we allocate 16 slots to avoid a lot of reallocations */
-       if (talloc_array_length(poll_ev->fds) == poll_ev->num_fds) {
-               struct pollfd *tmp_fds;
-               struct tevent_fd **tmp_fd_events;
-               tmp_fds = talloc_realloc(
-                       poll_ev, poll_ev->fds, struct pollfd,
-                       poll_ev->num_fds + 16);
-               if (tmp_fds == NULL) {
-                       TALLOC_FREE(fde);
-                       return NULL;
-               }
-               poll_ev->fds = tmp_fds;
-
-               tmp_fd_events = talloc_realloc(
-                       poll_ev, poll_ev->fd_events, struct tevent_fd *,
-                       poll_ev->num_fds + 16);
-               if (tmp_fd_events == NULL) {
-                       TALLOC_FREE(fde);
-                       return NULL;
-               }
-               poll_ev->fd_events = tmp_fd_events;
-       }
-
-       pfd = &poll_ev->fds[poll_ev->num_fds];
-
-       pfd->fd = fd;
-
-       pfd->events = 0;
-       pfd->revents = 0;
-
-       if (flags & TEVENT_FD_READ) {
-               pfd->events |= (POLLIN|POLLHUP);
-       }
-       if (flags & TEVENT_FD_WRITE) {
-               pfd->events |= (POLLOUT);
+       fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd);
+       if (fde == NULL) {
+               return NULL;
        }
+       fde->event_ctx          = ev;
+       fde->fd                 = fd;
+       fde->flags              = flags;
+       fde->handler            = handler;
+       fde->close_fn           = NULL;
+       fde->private_data       = private_data;
+       fde->handler_name       = handler_name;
+       fde->location           = location;
+       fde->additional_flags   = UINT64_MAX;
+       fde->additional_data    = NULL;
+
+       DLIST_ADD(poll_ev->fresh, fde);
+       talloc_set_destructor(fde, poll_fresh_fde_destructor);
+       poll_event_wake_pollthread(poll_ev);
 
-       fde->additional_flags = poll_ev->num_fds;
-       poll_ev->fd_events[poll_ev->num_fds] = fde;
-
-       poll_ev->num_fds += 1;
-
-       talloc_set_destructor(fde, poll_event_fd_destructor);
-
+       /*
+        * poll_event_loop_poll will take care of the rest in
+        * poll_event_setup_fresh
+        */
        return fde;
 }
 
@@ -163,10 +306,29 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
 */
 static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
 {
-       struct poll_event_context *poll_ev = talloc_get_type_abort(
-               fde->event_ctx->additional_data, struct poll_event_context);
+       struct tevent_context *ev = fde->event_ctx;
+       struct poll_event_context *poll_ev;
        uint64_t idx = fde->additional_flags;
-       uint16_t pollflags = 0;
+       uint16_t pollflags;
+
+       if (ev == NULL) {
+               return;
+       }
+       poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       fde->flags = flags;
+
+       if (idx == UINT64_MAX) {
+               /*
+                * poll_event_setup_fresh not yet called after this fde was
+                * added. We don't have to do anything to transfer the changed
+                * flags to the array passed to poll(2)
+                */
+               return;
+       }
+
+       pollflags = 0;
 
        if (flags & TEVENT_FD_READ) {
                pollflags |= (POLLIN|POLLHUP);
@@ -174,10 +336,84 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        if (flags & TEVENT_FD_WRITE) {
                pollflags |= (POLLOUT);
        }
-
        poll_ev->fds[idx].events = pollflags;
 
-       fde->flags = flags;
+       poll_event_wake_pollthread(poll_ev);
+}
+
+static bool poll_event_setup_fresh(struct tevent_context *ev,
+                                  struct poll_event_context *poll_ev)
+{
+       struct tevent_fd *fde, *next;
+       unsigned num_fresh, num_fds;
+
+       if (poll_ev->fresh == NULL) {
+               return true;
+       }
+
+       num_fresh = 0;
+       for (fde = poll_ev->fresh; fde; fde = fde->next) {
+               num_fresh += 1;
+       }
+       num_fds = poll_ev->num_fds + num_fresh;
+
+       /*
+        * We check the length of fdes here. It is the last one
+        * enlarged, so if the realloc for poll_fd->fdes fails,
+        * poll_fd->fds will have at least the size of poll_fd->fdes
+        */
+
+       if (num_fds >= talloc_array_length(poll_ev->fdes)) {
+               struct pollfd *tmp_fds;
+               struct tevent_fd **tmp_fdes;
+               unsigned array_length;
+
+               array_length = (num_fds + 15) & ~15; /* round up to 16 */
+
+               tmp_fds = talloc_realloc(
+                       poll_ev, poll_ev->fds, struct pollfd, array_length);
+               if (tmp_fds == NULL) {
+                       return false;
+               }
+               poll_ev->fds = tmp_fds;
+
+               tmp_fdes = talloc_realloc(
+                       poll_ev, poll_ev->fdes, struct tevent_fd *,
+                       array_length);
+               if (tmp_fdes == NULL) {
+                       return false;
+               }
+               poll_ev->fdes = tmp_fdes;
+       }
+
+       for (fde = poll_ev->fresh; fde; fde = next) {
+               struct pollfd *pfd;
+
+               pfd = &poll_ev->fds[poll_ev->num_fds];
+
+               pfd->fd = fde->fd;
+               pfd->events = 0;
+               pfd->revents = 0;
+
+               if (fde->flags & TEVENT_FD_READ) {
+                       pfd->events |= (POLLIN|POLLHUP);
+               }
+               if (fde->flags & TEVENT_FD_WRITE) {
+                       pfd->events |= (POLLOUT);
+               }
+
+               fde->additional_flags = poll_ev->num_fds;
+               poll_ev->fdes[poll_ev->num_fds] = fde;
+
+               next = fde->next;
+               DLIST_REMOVE(poll_ev->fresh, fde);
+               DLIST_ADD(ev->fd_events, fde);
+
+               talloc_set_destructor(fde, poll_event_fd_destructor);
+
+               poll_ev->num_fds += 1;
+       }
+       return true;
 }
 
 /*
@@ -188,9 +424,10 @@ static int poll_event_loop_poll(struct tevent_context *ev,
 {
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
-       struct tevent_fd *fde;
        int pollrtn;
        int timeout = -1;
+       unsigned first_fd;
+       unsigned i;
 
        if (ev->signal_events && tevent_common_check_signal(ev)) {
                return 0;
@@ -201,25 +438,21 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                timeout += (tvalp->tv_usec + 999) / 1000;
        }
 
+       poll_event_drain_signal_fd(poll_ev);
+
+       if (!poll_event_setup_fresh(ev, poll_ev)) {
+               return -1;
+       }
+
+       tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_BEFORE_WAIT);
        pollrtn = poll(poll_ev->fds, poll_ev->num_fds, timeout);
+       tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_AFTER_WAIT);
 
        if (pollrtn == -1 && errno == EINTR && ev->signal_events) {
                tevent_common_check_signal(ev);
                return 0;
        }
 
-       if (pollrtn == -1 && errno == EBADF) {
-               /* the socket is dead! this should never
-                  happen as the socket should have first been
-                  made readable and that should have removed
-                  the event, so this must be a bug. This is a
-                  fatal error. */
-               tevent_debug(ev, TEVENT_DEBUG_FATAL,
-                            "ERROR: EBADF on poll_event_loop_once\n");
-               poll_ev->exit_code = EBADF;
-               return -1;
-       }
-
        if (pollrtn == 0 && tvalp) {
                /* we don't care about a possible delay here */
                tevent_common_loop_timer_delay(ev);
@@ -233,16 +466,33 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                return 0;
        }
 
+       first_fd = (poll_ev->signal_fd != -1) ? 1 : 0;
+
        /* at least one file descriptor is ready - check
           which ones and call the handler, being careful to allow
           the handler to remove itself when called */
 
-       for (fde = ev->fd_events; fde; fde = fde->next) {
+       for (i=first_fd; i<poll_ev->num_fds; i++) {
                struct pollfd *pfd;
-               uint64_t pfd_idx = fde->additional_flags;
+               struct tevent_fd *fde;
                uint16_t flags = 0;
 
-               pfd = &poll_ev->fds[pfd_idx];
+               fde = poll_ev->fdes[i];
+               if (fde == NULL) {
+                       /*
+                        * This fde was talloc_free()'ed. Delete it
+                        * from the arrays
+                        */
+                       poll_ev->num_fds -= 1;
+                       poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds];
+                       poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds];
+                       if (poll_ev->fdes[i] != NULL) {
+                               poll_ev->fdes[i]->additional_flags = i;
+                       }
+                       continue;
+               }
+
+               pfd = &poll_ev->fds[i];
 
                if (pfd->revents & (POLLHUP|POLLERR)) {
                        /* If we only wait for TEVENT_FD_WRITE, we
@@ -297,6 +547,35 @@ static int poll_event_loop_once(struct tevent_context *ev,
        return poll_event_loop_poll(ev, &tval);
 }
 
+static int poll_event_loop_wait(struct tevent_context *ev,
+                               const char *location)
+{
+       struct poll_event_context *poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       /*
+        * loop as long as we have events pending
+        */
+       while (ev->fd_events ||
+              ev->timer_events ||
+              ev->immediate_events ||
+              ev->signal_events ||
+              poll_ev->fresh) {
+               int ret;
+               ret = _tevent_loop_once(ev, location);
+               if (ret != 0) {
+                       tevent_debug(ev, TEVENT_DEBUG_FATAL,
+                                    "_tevent_loop_once() failed: %d - %s\n",
+                                    ret, strerror(errno));
+                       return ret;
+               }
+       }
+
+       tevent_debug(ev, TEVENT_DEBUG_WARNING,
+                    "poll_event_loop_wait() out of events\n");
+       return 0;
+}
+
 static const struct tevent_ops poll_event_ops = {
        .context_init           = poll_event_context_init,
        .add_fd                 = poll_event_add_fd,
@@ -307,10 +586,28 @@ static const struct tevent_ops poll_event_ops = {
        .schedule_immediate     = tevent_common_schedule_immediate,
        .add_signal             = tevent_common_add_signal,
        .loop_once              = poll_event_loop_once,
-       .loop_wait              = tevent_common_loop_wait,
+       .loop_wait              = poll_event_loop_wait,
 };
 
 _PRIVATE_ bool tevent_poll_init(void)
 {
        return tevent_register_backend("poll", &poll_event_ops);
 }
+
+static const struct tevent_ops poll_event_mt_ops = {
+       .context_init           = poll_event_context_init_mt,
+       .add_fd                 = poll_event_add_fd,
+       .set_fd_close_fn        = tevent_common_fd_set_close_fn,
+       .get_fd_flags           = tevent_common_fd_get_flags,
+       .set_fd_flags           = poll_event_set_fd_flags,
+       .add_timer              = tevent_common_add_timer,
+       .schedule_immediate     = poll_event_schedule_immediate,
+       .add_signal             = tevent_common_add_signal,
+       .loop_once              = poll_event_loop_once,
+       .loop_wait              = poll_event_loop_wait,
+};
+
+_PRIVATE_ bool tevent_poll_mt_init(void)
+{
+       return tevent_register_backend("poll_mt", &poll_event_mt_ops);
+}