tevent_poll: Decouple poll_ev->fds handling from adding/removing fds
authorVolker Lendecke <vl@samba.org>
Sun, 29 Jul 2012 11:05:36 +0000 (13:05 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 16 Aug 2012 18:48:32 +0000 (20:48 +0200)
Step 1 in a python backend for multiple threads

Signed-off-by: Stefan Metzmacher <metze@samba.org>
lib/tevent/tevent_poll.c

index 7ae3c42188de1aec1b8e70d26cb9c8bbf9683f42..2639143998926978cc3b9015adb5184c6cfd4f68 100644 (file)
@@ -33,12 +33,17 @@ struct poll_event_context {
        /* a pointer back to the generic event_context */
        struct tevent_context *ev;
 
+       /*
+        * A DLIST for fresh fde's
+        */
+       struct tevent_fd *fresh;
+
        /*
         * These two arrays are maintained together.
         */
        struct pollfd *fds;
-       struct tevent_fd **fd_events;
-       uint64_t num_fds;
+       struct tevent_fd **fdes;
+       unsigned num_fds;
 
        /* information for exiting from the event loop */
        int exit_code;
@@ -67,7 +72,6 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
 {
        struct tevent_context *ev = fde->event_ctx;
        struct poll_event_context *poll_ev = NULL;
-       struct tevent_fd *moved_fde;
        uint64_t del_idx = fde->additional_flags;
 
        if (ev == NULL) {
@@ -77,16 +81,19 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
        poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
 
-       moved_fde = poll_ev->fd_events[poll_ev->num_fds-1];
-       poll_ev->fd_events[del_idx] = moved_fde;
-       poll_ev->fds[del_idx] = poll_ev->fds[poll_ev->num_fds-1];
-       moved_fde->additional_flags = del_idx;
-
-       poll_ev->num_fds -= 1;
+       poll_ev->fdes[del_idx] = NULL;
 done:
        return tevent_common_fd_destructor(fde);
 }
 
+static int poll_fresh_fde_destructor(struct tevent_fd *fde)
+{
+       struct poll_event_context *poll_ev = talloc_get_type_abort(
+               fde->event_ctx->additional_data, struct poll_event_context);
+       DLIST_REMOVE(poll_ev->fresh, fde);
+       return 0;
+}
+
 /*
   add a fd based event
   return NULL on failure (memory allocation error)
@@ -101,60 +108,34 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
 {
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
-       struct pollfd *pfd;
        struct tevent_fd *fde;
 
-       fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
-                                  handler, private_data,
-                                  handler_name, location);
-       if (fde == NULL) {
+       if (fd < 0) {
                return NULL;
        }
 
-       /* we allocate 16 slots to avoid a lot of reallocations */
-       if (talloc_array_length(poll_ev->fds) == poll_ev->num_fds) {
-               struct pollfd *tmp_fds;
-               struct tevent_fd **tmp_fd_events;
-               tmp_fds = talloc_realloc(
-                       poll_ev, poll_ev->fds, struct pollfd,
-                       poll_ev->num_fds + 16);
-               if (tmp_fds == NULL) {
-                       TALLOC_FREE(fde);
-                       return NULL;
-               }
-               poll_ev->fds = tmp_fds;
-
-               tmp_fd_events = talloc_realloc(
-                       poll_ev, poll_ev->fd_events, struct tevent_fd *,
-                       poll_ev->num_fds + 16);
-               if (tmp_fd_events == NULL) {
-                       TALLOC_FREE(fde);
-                       return NULL;
-               }
-               poll_ev->fd_events = tmp_fd_events;
-       }
-
-       pfd = &poll_ev->fds[poll_ev->num_fds];
-
-       pfd->fd = fd;
-
-       pfd->events = 0;
-       pfd->revents = 0;
-
-       if (flags & TEVENT_FD_READ) {
-               pfd->events |= (POLLIN|POLLHUP);
-       }
-       if (flags & TEVENT_FD_WRITE) {
-               pfd->events |= (POLLOUT);
+       fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd);
+       if (fde == NULL) {
+               return NULL;
        }
+       fde->event_ctx          = ev;
+       fde->fd                 = fd;
+       fde->flags              = flags;
+       fde->handler            = handler;
+       fde->close_fn           = NULL;
+       fde->private_data       = private_data;
+       fde->handler_name       = handler_name;
+       fde->location           = location;
+       fde->additional_flags   = 0;
+       fde->additional_data    = NULL;
+
+       DLIST_ADD(poll_ev->fresh, fde);
+       talloc_set_destructor(fde, poll_fresh_fde_destructor);
 
-       fde->additional_flags = poll_ev->num_fds;
-       poll_ev->fd_events[poll_ev->num_fds] = fde;
-
-       poll_ev->num_fds += 1;
-
-       talloc_set_destructor(fde, poll_event_fd_destructor);
-
+       /*
+        * poll_event_loop_poll will take care of the rest in
+        * poll_event_setup_fresh
+        */
        return fde;
 }
 
@@ -180,6 +161,81 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        fde->flags = flags;
 }
 
+static bool poll_event_setup_fresh(struct tevent_context *ev,
+                                  struct poll_event_context *poll_ev)
+{
+       struct tevent_fd *fde, *next;
+       unsigned num_fresh, num_fds;
+
+       if (poll_ev->fresh == NULL) {
+               return true;
+       }
+
+       num_fresh = 0;
+       for (fde = poll_ev->fresh; fde; fde = fde->next) {
+               num_fresh += 1;
+       }
+       num_fds = poll_ev->num_fds + num_fresh;
+
+       /*
+        * We check the length of fdes here. It is the last one
+        * enlarged, so if the realloc for poll_fd->fdes fails,
+        * poll_fd->fds will have at least the size of poll_fd->fdes
+        */
+
+       if (num_fds >= talloc_array_length(poll_ev->fdes)) {
+               struct pollfd *tmp_fds;
+               struct tevent_fd **tmp_fdes;
+               unsigned array_length;
+
+               array_length = (num_fds + 15) & ~15; /* round up to 16 */
+
+               tmp_fds = talloc_realloc(
+                       poll_ev, poll_ev->fds, struct pollfd, array_length);
+               if (tmp_fds == NULL) {
+                       return false;
+               }
+               poll_ev->fds = tmp_fds;
+
+               tmp_fdes = talloc_realloc(
+                       poll_ev, poll_ev->fdes, struct tevent_fd *,
+                       array_length);
+               if (tmp_fdes == NULL) {
+                       return false;
+               }
+               poll_ev->fdes = tmp_fdes;
+       }
+
+       for (fde = poll_ev->fresh; fde; fde = next) {
+               struct pollfd *pfd;
+
+               pfd = &poll_ev->fds[poll_ev->num_fds];
+
+               pfd->fd = fde->fd;
+               pfd->events = 0;
+               pfd->revents = 0;
+
+               if (fde->flags & TEVENT_FD_READ) {
+                       pfd->events |= (POLLIN|POLLHUP);
+               }
+               if (fde->flags & TEVENT_FD_WRITE) {
+                       pfd->events |= (POLLOUT);
+               }
+
+               fde->additional_flags = poll_ev->num_fds;
+               poll_ev->fdes[poll_ev->num_fds] = fde;
+
+               next = fde->next;
+               DLIST_REMOVE(poll_ev->fresh, fde);
+               DLIST_ADD(ev->fd_events, fde);
+
+               talloc_set_destructor(fde, poll_event_fd_destructor);
+
+               poll_ev->num_fds += 1;
+       }
+       return true;
+}
+
 /*
   event loop handling using poll()
 */
@@ -188,9 +244,9 @@ static int poll_event_loop_poll(struct tevent_context *ev,
 {
        struct poll_event_context *poll_ev = talloc_get_type_abort(
                ev->additional_data, struct poll_event_context);
-       struct tevent_fd *fde;
        int pollrtn;
        int timeout = -1;
+       unsigned i;
 
        if (ev->signal_events && tevent_common_check_signal(ev)) {
                return 0;
@@ -201,6 +257,10 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                timeout += (tvalp->tv_usec + 999) / 1000;
        }
 
+       if (!poll_event_setup_fresh(ev, poll_ev)) {
+               return -1;
+       }
+
        tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_BEFORE_WAIT);
        pollrtn = poll(poll_ev->fds, poll_ev->num_fds, timeout);
        tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_AFTER_WAIT);
@@ -210,18 +270,6 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                return 0;
        }
 
-       if (pollrtn == -1 && errno == EBADF) {
-               /* the socket is dead! this should never
-                  happen as the socket should have first been
-                  made readable and that should have removed
-                  the event, so this must be a bug. This is a
-                  fatal error. */
-               tevent_debug(ev, TEVENT_DEBUG_FATAL,
-                            "ERROR: EBADF on poll_event_loop_once\n");
-               poll_ev->exit_code = EBADF;
-               return -1;
-       }
-
        if (pollrtn == 0 && tvalp) {
                /* we don't care about a possible delay here */
                tevent_common_loop_timer_delay(ev);
@@ -239,12 +287,27 @@ static int poll_event_loop_poll(struct tevent_context *ev,
           which ones and call the handler, being careful to allow
           the handler to remove itself when called */
 
-       for (fde = ev->fd_events; fde; fde = fde->next) {
+       for (i=0; i<poll_ev->num_fds; i++) {
                struct pollfd *pfd;
-               uint64_t pfd_idx = fde->additional_flags;
+               struct tevent_fd *fde;
                uint16_t flags = 0;
 
-               pfd = &poll_ev->fds[pfd_idx];
+               fde = poll_ev->fdes[i];
+               if (fde == NULL) {
+                       /*
+                        * This fde was talloc_free()'ed. Delete it
+                        * from the arrays
+                        */
+                       poll_ev->num_fds -= 1;
+                       poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds];
+                       poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds];
+                       if (poll_ev->fdes[i] != NULL) {
+                               poll_ev->fdes[i]->additional_flags = i;
+                       }
+                       continue;
+               }
+
+               pfd = &poll_ev->fds[i];
 
                if (pfd->revents & (POLLHUP|POLLERR)) {
                        /* If we only wait for TEVENT_FD_WRITE, we