pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue
authorStefan Metzmacher <metze@samba.org>
Fri, 22 Jun 2018 15:14:31 +0000 (17:14 +0200)
committerRalph Boehme <slow@samba.org>
Tue, 24 Jul 2018 15:38:27 +0000 (17:38 +0200)
We should avoid traversing a linked list within a thread without holding
a mutex!

Using a mutex would be very tricky as we'll likely deadlock with
the mutexes at the raw pthreadpool layer.

So we use somekind of spinlock using atomic_thread_fence in order to
protect the access to job->state->glue->{tctx,ev} in
pthreadpool_tevent_job_signal().

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
lib/pthreadpool/pthreadpool_tevent.c

index 821d13b02362433b5f2f806b61e74ec2baf13fc3..3b502a7cc5a3cdb8080a8afc97946452ada796cf 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #include "replace.h"
  */
 
 #include "replace.h"
+#include "system/select.h"
 #include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
 #include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
@@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue {
        struct tevent_threaded_context *tctx;
        /* Pointer to link object owned by *ev. */
        struct pthreadpool_tevent_glue_ev_link *ev_link;
        struct tevent_threaded_context *tctx;
        /* Pointer to link object owned by *ev. */
        struct pthreadpool_tevent_glue_ev_link *ev_link;
+       /* active jobs */
+       struct pthreadpool_tevent_job_state *states;
 };
 
 /*
 };
 
 /*
@@ -127,6 +130,8 @@ struct pthreadpool_tevent {
 };
 
 struct pthreadpool_tevent_job_state {
 };
 
 struct pthreadpool_tevent_job_state {
+       struct pthreadpool_tevent_job_state *prev, *next;
+       struct pthreadpool_tevent_glue *glue;
        struct tevent_context *ev;
        struct tevent_req *req;
        struct pthreadpool_tevent_job *job;
        struct tevent_context *ev;
        struct tevent_req *req;
        struct pthreadpool_tevent_job *job;
@@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 static int pthreadpool_tevent_glue_destructor(
        struct pthreadpool_tevent_glue *glue)
 {
 static int pthreadpool_tevent_glue_destructor(
        struct pthreadpool_tevent_glue *glue)
 {
+       struct pthreadpool_tevent_job_state *state = NULL;
+       struct pthreadpool_tevent_job_state *nstate = NULL;
+
+       for (state = glue->states; state != NULL; state = nstate) {
+               nstate = state->next;
+
+               /* The job this removes it from the list */
+               pthreadpool_tevent_job_orphan(state->job);
+       }
+
        if (glue->pool->glue_list != NULL) {
                DLIST_REMOVE(glue->pool->glue_list, glue);
        }
        if (glue->pool->glue_list != NULL) {
                DLIST_REMOVE(glue->pool->glue_list, glue);
        }
@@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor(
        return 0;
 }
 
        return 0;
 }
 
-static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
-                                         struct tevent_context *ev)
+static int pthreadpool_tevent_register_ev(
+                               struct pthreadpool_tevent *pool,
+                               struct pthreadpool_tevent_job_state *state)
 {
 {
+       struct tevent_context *ev = state->ev;
        struct pthreadpool_tevent_glue *glue = NULL;
        struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
 
        struct pthreadpool_tevent_glue *glue = NULL;
        struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
 
@@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
         * pair.
         */
        for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
         * pair.
         */
        for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
-               if (glue->ev == ev) {
+               if (glue->ev == state->ev) {
+                       state->glue = glue;
+                       DLIST_ADD_END(glue->states, state);
                        return 0;
                }
        }
                        return 0;
                }
        }
@@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
        }
 #endif
 
        }
 #endif
 
+       state->glue = glue;
+       DLIST_ADD_END(glue->states, state);
+
        DLIST_ADD(pool->glue_list, glue);
        return 0;
 }
        DLIST_ADD(pool->glue_list, glue);
        return 0;
 }
@@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
        /*
         * We should never be called with needs_fence.orphaned == false.
         * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
        /*
         * We should never be called with needs_fence.orphaned == false.
         * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
-        * after detaching from the request state and pool list.
+        * after detaching from the request state, glue and pool list.
         */
        if (!job->needs_fence.orphaned) {
                abort();
         */
        if (!job->needs_fence.orphaned) {
                abort();
@@ -509,6 +531,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
                abort();
        }
 
                abort();
        }
 
+       /*
+        * Once we marked the request as 'orphaned'
+        * we spin/loop if it's already marked
+        * as 'finished' (which means that
+        * pthreadpool_tevent_job_signal() was entered.
+        * If it saw 'orphaned' it will exit after setting
+        * 'dropped', otherwise it dereferences
+        * job->state->glue->{tctx,ev} until it exited
+        * after setting 'signaled'.
+        *
+        * We need to close this potential gab before
+        * we can set job->state = NULL.
+        *
+        * This is some kind of spinlock, but with
+        * 1 millisecond sleeps in between, in order
+        * to give the thread more cpu time to finish.
+        */
+       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+       while (job->needs_fence.finished) {
+               if (job->needs_fence.dropped) {
+                       break;
+               }
+               if (job->needs_fence.signaled) {
+                       break;
+               }
+               poll(NULL, 0, 1);
+               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+       }
+
+       /*
+        * Once the gab is closed, we can remove
+        * the glue link.
+        */
+       DLIST_REMOVE(job->state->glue->states, job->state);
+       job->state->glue = NULL;
+
        /*
         * We need to reparent to a long term context.
         * And detach from the request state.
        /*
         * We need to reparent to a long term context.
         * And detach from the request state.
@@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
                 * The job request is not scheduled in the pool
                 * yet or anymore.
                 */
                 * The job request is not scheduled in the pool
                 * yet or anymore.
                 */
+               if (state->glue != NULL) {
+                       DLIST_REMOVE(state->glue->states, state);
+                       state->glue = NULL;
+               }
                return;
        }
 
                return;
        }
 
@@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
                return tevent_req_post(req, ev);
        }
 
                return tevent_req_post(req, ev);
        }
 
-       ret = pthreadpool_tevent_register_ev(pool, ev);
+       ret = pthreadpool_tevent_register_ev(pool, state);
        if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
        if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
@@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
        struct pthreadpool_tevent_job *job =
                talloc_get_type_abort(job_private_data,
                struct pthreadpool_tevent_job);
        struct pthreadpool_tevent_job *job =
                talloc_get_type_abort(job_private_data,
                struct pthreadpool_tevent_job);
-       struct pthreadpool_tevent_job_state *state = job->state;
-       struct tevent_threaded_context *tctx = NULL;
-       struct pthreadpool_tevent_glue *g = NULL;
 
        job->needs_fence.finished = true;
        PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
 
        job->needs_fence.finished = true;
        PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
@@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid,
                return 0;
        }
 
                return 0;
        }
 
-#ifdef HAVE_PTHREAD
-       for (g = job->pool->glue_list; g != NULL; g = g->next) {
-               if (g->ev == state->ev) {
-                       tctx = g->tctx;
-                       break;
-               }
-       }
-
-       if (tctx == NULL) {
-               abort();
-       }
-#endif
-
-       if (tctx != NULL) {
+       /*
+        * state and state->glue are valid,
+        * see the job->needs_fence.finished
+        * "spinlock" loop in
+        * pthreadpool_tevent_job_orphan()
+        */
+       if (job->state->glue->tctx != NULL) {
                /* with HAVE_PTHREAD */
                /* with HAVE_PTHREAD */
-               tevent_threaded_schedule_immediate(tctx, job->im,
+               tevent_threaded_schedule_immediate(job->state->glue->tctx,
+                                                  job->im,
                                                   pthreadpool_tevent_job_done,
                                                   job);
        } else {
                /* without HAVE_PTHREAD */
                                                   pthreadpool_tevent_job_done,
                                                   job);
        } else {
                /* without HAVE_PTHREAD */
-               tevent_schedule_immediate(job->im, state->ev,
+               tevent_schedule_immediate(job->im,
+                                         job->state->glue->ev,
                                          pthreadpool_tevent_job_done,
                                          job);
        }
                                          pthreadpool_tevent_job_done,
                                          job);
        }