pthreadpool: split out pthreadpool_tevent_job from pthreadpool_tevent_job_state
authorStefan Metzmacher <metze@samba.org>
Thu, 21 Jun 2018 23:39:47 +0000 (01:39 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 12 Jul 2018 12:25:19 +0000 (14:25 +0200)
This makes it much easier to handle orphaned jobs,
we either wait for the immediate tevent to trigger
or we just keep leaking the memory.

The next commits will improve this further.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
lib/pthreadpool/pthreadpool_tevent.c

index 0e890cb..7c8015d 100644 (file)
@@ -57,15 +57,21 @@ struct pthreadpool_tevent {
        struct pthreadpool *pool;
        struct pthreadpool_tevent_glue *glue_list;
 
-       struct pthreadpool_tevent_job_state *jobs;
+       struct pthreadpool_tevent_job *jobs;
 };
 
 struct pthreadpool_tevent_job_state {
-       struct pthreadpool_tevent_job_state *prev, *next;
-       struct pthreadpool_tevent *pool;
        struct tevent_context *ev;
-       struct tevent_immediate *im;
        struct tevent_req *req;
+       struct pthreadpool_tevent_job *job;
+};
+
+struct pthreadpool_tevent_job {
+       struct pthreadpool_tevent_job *prev, *next;
+
+       struct pthreadpool_tevent *pool;
+       struct pthreadpool_tevent_job_state *state;
+       struct tevent_immediate *im;
 
        void (*fn)(void *private_data);
        void *private_data;
@@ -73,6 +79,8 @@ struct pthreadpool_tevent_job_state {
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
 
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
+
 static int pthreadpool_tevent_job_signal(int jobid,
                                         void (*job_fn)(void *private_data),
                                         void *job_private_data,
@@ -122,7 +130,8 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
-       struct pthreadpool_tevent_job_state *state, *next;
+       struct pthreadpool_tevent_job *job = NULL;
+       struct pthreadpool_tevent_job *njob = NULL;
        struct pthreadpool_tevent_glue *glue = NULL;
        int ret;
 
@@ -132,10 +141,11 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
        }
        pool->pool = NULL;
 
-       for (state = pool->jobs; state != NULL; state = next) {
-               next = state->next;
-               DLIST_REMOVE(pool->jobs, state);
-               state->pool = NULL;
+       for (job = pool->jobs; job != NULL; job = njob) {
+               njob = job->next;
+
+               /* The job this removes it from the list */
+               pthreadpool_tevent_job_orphan(job);
        }
 
        /*
@@ -258,27 +268,120 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
                                        struct tevent_immediate *im,
                                        void *private_data);
 
-static int pthreadpool_tevent_job_state_destructor(
-       struct pthreadpool_tevent_job_state *state)
+static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 {
-       if (state->pool == NULL) {
-               return 0;
+       /*
+        * We should never be called with state->state != NULL.
+        * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
+        * after detaching from the request state and pool list.
+        */
+       if (job->state != NULL) {
+               abort();
+       }
+
+       /*
+        * If the job is not finished (job->im still there)
+        * and it's still attached to the pool,
+        * we try to cancel it (before it was starts)
+        */
+       if (job->im != NULL && job->pool != NULL) {
+               size_t num;
+
+               num = pthreadpool_cancel_job(job->pool->pool, 0,
+                                            pthreadpool_tevent_job_fn,
+                                            job);
+               if (num != 0) {
+                       /*
+                        * It was not too late to cancel the request.
+                        *
+                        * We can remove job->im, as it will never be used.
+                        */
+                       TALLOC_FREE(job->im);
+               }
+       }
+
+       /*
+        * pthreadpool_tevent_job_orphan() already removed
+        * it from pool->jobs. And we don't need try
+        * pthreadpool_cancel_job() again.
+        */
+       job->pool = NULL;
+
+       if (job->im != NULL) {
+               /*
+                * state->im still there means, we need to wait for the
+                * immediate event to be triggered or just leak the memory.
+                */
+               return -1;
+       }
+
+       return 0;
+}
+
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
+{
+       /*
+        * We're the only function that sets
+        * job->state = NULL;
+        */
+       if (job->state == NULL) {
+               abort();
        }
 
        /*
-        * We should never be called with state->req == NULL,
-        * state->pool must be cleared before the 2nd talloc_free().
+        * We need to reparent to a long term context.
+        * And detach from the request state.
+        * Maybe the destructor will keep the memory
+        * and leak it for now.
         */
-       if (state->req == NULL) {
+       (void)talloc_reparent(job->state, NULL, job);
+       job->state->job = NULL;
+       job->state = NULL;
+
+       /*
+        * job->pool will only be set to NULL
+        * in the first destructur run.
+        */
+       if (job->pool == NULL) {
                abort();
        }
 
+       /*
+        * Dettach it from the pool.
+        *
+        * The job might still be running,
+        * so we keep job->pool.
+        * The destructor will set it to NULL
+        * after trying pthreadpool_cancel_job()
+        */
+       DLIST_REMOVE(job->pool->jobs, job);
+
+       TALLOC_FREE(job);
+}
+
+static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
+                                          enum tevent_req_state req_state)
+{
+       struct pthreadpool_tevent_job_state *state =
+               tevent_req_data(req,
+               struct pthreadpool_tevent_job_state);
+
+       if (state->job == NULL) {
+               /*
+                * The job request is not scheduled in the pool
+                * yet or anymore.
+                */
+               return;
+       }
+
        /*
         * We need to reparent to a long term context.
+        * Maybe the destructor will keep the memory
+        * and leak it for now.
         */
-       (void)talloc_reparent(state->req, NULL, state);
-       state->req = NULL;
-       return -1;
+       pthreadpool_tevent_job_orphan(state->job);
+       state->job = NULL; /* not needed but looks better */
+       return;
 }
 
 struct tevent_req *pthreadpool_tevent_job_send(
@@ -286,8 +389,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
        struct pthreadpool_tevent *pool,
        void (*fn)(void *private_data), void *private_data)
 {
-       struct tevent_req *req;
-       struct pthreadpool_tevent_job_state *state;
+       struct tevent_req *req = NULL;
+       struct pthreadpool_tevent_job_state *state = NULL;
+       struct pthreadpool_tevent_job *job = NULL;
        int ret;
 
        req = tevent_req_create(mem_ctx, &state,
@@ -295,11 +399,10 @@ struct tevent_req *pthreadpool_tevent_job_send(
        if (req == NULL) {
                return NULL;
        }
-       state->pool = pool;
        state->ev = ev;
        state->req = req;
-       state->fn = fn;
-       state->private_data = private_data;
+
+       tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
 
        if (pool == NULL) {
                tevent_req_error(req, EINVAL);
@@ -310,39 +413,44 @@ struct tevent_req *pthreadpool_tevent_job_send(
                return tevent_req_post(req, ev);
        }
 
-       state->im = tevent_create_immediate(state);
-       if (tevent_req_nomem(state->im, req)) {
+       ret = pthreadpool_tevent_register_ev(pool, ev);
+       if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
 
-       ret = pthreadpool_tevent_register_ev(pool, ev);
-       if (tevent_req_error(req, ret)) {
+       job = talloc_zero(state, struct pthreadpool_tevent_job);
+       if (tevent_req_nomem(job, req)) {
                return tevent_req_post(req, ev);
        }
+       job->pool = pool;
+       job->fn = fn;
+       job->private_data = private_data;
+       job->im = tevent_create_immediate(state->job);
+       if (tevent_req_nomem(job->im, req)) {
+               return tevent_req_post(req, ev);
+       }
+       talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
+       DLIST_ADD_END(job->pool->jobs, job);
+       job->state = state;
+       state->job = job;
 
-       ret = pthreadpool_add_job(pool->pool, 0,
+       ret = pthreadpool_add_job(job->pool->pool, 0,
                                  pthreadpool_tevent_job_fn,
-                                 state);
+                                 job);
        if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
 
-       /*
-        * Once the job is scheduled, we need to protect
-        * our memory.
-        */
-       talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
-
-       DLIST_ADD_END(pool->jobs, state);
-
        return req;
 }
 
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               private_data, struct pthreadpool_tevent_job_state);
-       state->fn(state->private_data);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(private_data,
+               struct pthreadpool_tevent_job);
+
+       job->fn(job->private_data);
 }
 
 static int pthreadpool_tevent_job_signal(int jobid,
@@ -350,18 +458,20 @@ static int pthreadpool_tevent_job_signal(int jobid,
                                         void *job_private_data,
                                         void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               job_private_data, struct pthreadpool_tevent_job_state);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(job_private_data,
+               struct pthreadpool_tevent_job);
+       struct pthreadpool_tevent_job_state *state = job->state;
        struct tevent_threaded_context *tctx = NULL;
        struct pthreadpool_tevent_glue *g = NULL;
 
-       if (state->pool == NULL) {
-               /* The pthreadpool_tevent is already gone */
+       if (state == NULL) {
+               /* Request already gone */
                return 0;
        }
 
 #ifdef HAVE_PTHREAD
-       for (g = state->pool->glue_list; g != NULL; g = g->next) {
+       for (g = job->pool->glue_list; g != NULL; g = g->next) {
                if (g->ev == state->ev) {
                        tctx = g->tctx;
                        break;
@@ -375,14 +485,14 @@ static int pthreadpool_tevent_job_signal(int jobid,
 
        if (tctx != NULL) {
                /* with HAVE_PTHREAD */
-               tevent_threaded_schedule_immediate(tctx, state->im,
+               tevent_threaded_schedule_immediate(tctx, job->im,
                                                   pthreadpool_tevent_job_done,
-                                                  state);
+                                                  job);
        } else {
                /* without HAVE_PTHREAD */
-               tevent_schedule_immediate(state->im, state->ev,
+               tevent_schedule_immediate(job->im, state->ev,
                                          pthreadpool_tevent_job_done,
-                                         state);
+                                         job);
        }
 
        return 0;
@@ -392,27 +502,23 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
                                        struct tevent_immediate *im,
                                        void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               private_data, struct pthreadpool_tevent_job_state);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(private_data,
+               struct pthreadpool_tevent_job);
+       struct pthreadpool_tevent_job_state *state = job->state;
 
-       if (state->pool != NULL) {
-               DLIST_REMOVE(state->pool->jobs, state);
-               state->pool = NULL;
-       }
+       TALLOC_FREE(job->im);
 
-       if (state->req == NULL) {
-               /*
-                * There was a talloc_free() state->req
-                * while the job was pending,
-                * which mean we're reparented on a longterm
-                * talloc context.
-                *
-                * We just cleanup here...
-                */
-               talloc_free(state);
+       if (state == NULL) {
+               /* Request already gone */
+               TALLOC_FREE(job);
                return;
        }
 
+       /*
+        * pthreadpool_tevent_job_cleanup()
+        * will destroy the job.
+        */
        tevent_req_done(state->req);
 }