Revert "pthreadpool: add pthreadpool_tevent_job_cancel()"
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
index 493083406ab02139fed8bc05cb89577a3d1bb006..6999730f25501f13c43094da2ed7774b92b174ad 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #include "replace.h"
  */
 
 #include "replace.h"
+#include "system/filesys.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
 #include "lib/util/tevent_unix.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
 #include "lib/util/tevent_unix.h"
@@ -57,15 +58,21 @@ struct pthreadpool_tevent {
        struct pthreadpool *pool;
        struct pthreadpool_tevent_glue *glue_list;
 
        struct pthreadpool *pool;
        struct pthreadpool_tevent_glue *glue_list;
 
-       struct pthreadpool_tevent_job_state *jobs;
+       struct pthreadpool_tevent_job *jobs;
 };
 
 struct pthreadpool_tevent_job_state {
 };
 
 struct pthreadpool_tevent_job_state {
-       struct pthreadpool_tevent_job_state *prev, *next;
-       struct pthreadpool_tevent *pool;
        struct tevent_context *ev;
        struct tevent_context *ev;
-       struct tevent_immediate *im;
        struct tevent_req *req;
        struct tevent_req *req;
+       struct pthreadpool_tevent_job *job;
+};
+
+struct pthreadpool_tevent_job {
+       struct pthreadpool_tevent_job *prev, *next;
+
+       struct pthreadpool_tevent *pool;
+       struct pthreadpool_tevent_job_state *state;
+       struct tevent_immediate *im;
 
        void (*fn)(void *private_data);
        void *private_data;
 
        void (*fn)(void *private_data);
        void *private_data;
@@ -73,6 +80,8 @@ struct pthreadpool_tevent_job_state {
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
 
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
 
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
+
 static int pthreadpool_tevent_job_signal(int jobid,
                                         void (*job_fn)(void *private_data),
                                         void *job_private_data,
 static int pthreadpool_tevent_job_signal(int jobid,
                                         void (*job_fn)(void *private_data),
                                         void *job_private_data,
@@ -102,22 +111,41 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
        return 0;
 }
 
        return 0;
 }
 
+size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
+{
+       if (pool->pool == NULL) {
+               return 0;
+       }
+
+       return pthreadpool_max_threads(pool->pool);
+}
+
+size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
+{
+       if (pool->pool == NULL) {
+               return 0;
+       }
+
+       return pthreadpool_queued_jobs(pool->pool);
+}
+
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
-       struct pthreadpool_tevent_job_state *state, *next;
+       struct pthreadpool_tevent_job *job = NULL;
+       struct pthreadpool_tevent_job *njob = NULL;
        struct pthreadpool_tevent_glue *glue = NULL;
        int ret;
 
        struct pthreadpool_tevent_glue *glue = NULL;
        int ret;
 
-       ret = pthreadpool_destroy(pool->pool);
+       ret = pthreadpool_stop(pool->pool);
        if (ret != 0) {
                return ret;
        }
        if (ret != 0) {
                return ret;
        }
-       pool->pool = NULL;
 
 
-       for (state = pool->jobs; state != NULL; state = next) {
-               next = state->next;
-               DLIST_REMOVE(pool->jobs, state);
-               state->pool = NULL;
+       for (job = pool->jobs; job != NULL; job = njob) {
+               njob = job->next;
+
+               /* The job this removes it from the list */
+               pthreadpool_tevent_job_orphan(job);
        }
 
        /*
        }
 
        /*
@@ -131,6 +159,12 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
        }
        pool->glue_list = NULL;
 
        }
        pool->glue_list = NULL;
 
+       ret = pthreadpool_destroy(pool->pool);
+       if (ret != 0) {
+               return ret;
+       }
+       pool->pool = NULL;
+
        return 0;
 }
 
        return 0;
 }
 
@@ -223,7 +257,7 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
        glue->ev_link = ev_link;
 
 #ifdef HAVE_PTHREAD
        glue->ev_link = ev_link;
 
 #ifdef HAVE_PTHREAD
-       glue->tctx = tevent_threaded_context_create(pool, ev);
+       glue->tctx = tevent_threaded_context_create(glue, ev);
        if (glue->tctx == NULL) {
                TALLOC_FREE(ev_link);
                TALLOC_FREE(glue);
        if (glue->tctx == NULL) {
                TALLOC_FREE(ev_link);
                TALLOC_FREE(glue);
@@ -240,27 +274,120 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
                                        struct tevent_immediate *im,
                                        void *private_data);
 
                                        struct tevent_immediate *im,
                                        void *private_data);
 
-static int pthreadpool_tevent_job_state_destructor(
-       struct pthreadpool_tevent_job_state *state)
+static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 {
 {
-       if (state->pool == NULL) {
-               return 0;
+       /*
+        * We should never be called with state->state != NULL.
+        * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
+        * after detaching from the request state and pool list.
+        */
+       if (job->state != NULL) {
+               abort();
+       }
+
+       /*
+        * If the job is not finished (job->im still there)
+        * and it's still attached to the pool,
+        * we try to cancel it (before it was starts)
+        */
+       if (job->im != NULL && job->pool != NULL) {
+               size_t num;
+
+               num = pthreadpool_cancel_job(job->pool->pool, 0,
+                                            pthreadpool_tevent_job_fn,
+                                            job);
+               if (num != 0) {
+                       /*
+                        * It was not too late to cancel the request.
+                        *
+                        * We can remove job->im, as it will never be used.
+                        */
+                       TALLOC_FREE(job->im);
+               }
+       }
+
+       /*
+        * pthreadpool_tevent_job_orphan() already removed
+        * it from pool->jobs. And we don't need try
+        * pthreadpool_cancel_job() again.
+        */
+       job->pool = NULL;
+
+       if (job->im != NULL) {
+               /*
+                * state->im still there means, we need to wait for the
+                * immediate event to be triggered or just leak the memory.
+                */
+               return -1;
+       }
+
+       return 0;
+}
+
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
+{
+       /*
+        * We're the only function that sets
+        * job->state = NULL;
+        */
+       if (job->state == NULL) {
+               abort();
        }
 
        /*
        }
 
        /*
-        * We should never be called with state->req == NULL,
-        * state->pool must be cleared before the 2nd talloc_free().
+        * We need to reparent to a long term context.
+        * And detach from the request state.
+        * Maybe the destructor will keep the memory
+        * and leak it for now.
+        */
+       (void)talloc_reparent(job->state, NULL, job);
+       job->state->job = NULL;
+       job->state = NULL;
+
+       /*
+        * job->pool will only be set to NULL
+        * in the first destructur run.
         */
         */
-       if (state->req == NULL) {
+       if (job->pool == NULL) {
                abort();
        }
 
                abort();
        }
 
+       /*
+        * Dettach it from the pool.
+        *
+        * The job might still be running,
+        * so we keep job->pool.
+        * The destructor will set it to NULL
+        * after trying pthreadpool_cancel_job()
+        */
+       DLIST_REMOVE(job->pool->jobs, job);
+
+       TALLOC_FREE(job);
+}
+
+static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
+                                          enum tevent_req_state req_state)
+{
+       struct pthreadpool_tevent_job_state *state =
+               tevent_req_data(req,
+               struct pthreadpool_tevent_job_state);
+
+       if (state->job == NULL) {
+               /*
+                * The job request is not scheduled in the pool
+                * yet or anymore.
+                */
+               return;
+       }
+
        /*
         * We need to reparent to a long term context.
        /*
         * We need to reparent to a long term context.
+        * Maybe the destructor will keep the memory
+        * and leak it for now.
         */
         */
-       (void)talloc_reparent(state->req, NULL, state);
-       state->req = NULL;
-       return -1;
+       pthreadpool_tevent_job_orphan(state->job);
+       state->job = NULL; /* not needed but looks better */
+       return;
 }
 
 struct tevent_req *pthreadpool_tevent_job_send(
 }
 
 struct tevent_req *pthreadpool_tevent_job_send(
@@ -268,8 +395,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
        struct pthreadpool_tevent *pool,
        void (*fn)(void *private_data), void *private_data)
 {
        struct pthreadpool_tevent *pool,
        void (*fn)(void *private_data), void *private_data)
 {
-       struct tevent_req *req;
-       struct pthreadpool_tevent_job_state *state;
+       struct tevent_req *req = NULL;
+       struct pthreadpool_tevent_job_state *state = NULL;
+       struct pthreadpool_tevent_job *job = NULL;
        int ret;
 
        req = tevent_req_create(mem_ctx, &state,
        int ret;
 
        req = tevent_req_create(mem_ctx, &state,
@@ -277,46 +405,58 @@ struct tevent_req *pthreadpool_tevent_job_send(
        if (req == NULL) {
                return NULL;
        }
        if (req == NULL) {
                return NULL;
        }
-       state->pool = pool;
        state->ev = ev;
        state->req = req;
        state->ev = ev;
        state->req = req;
-       state->fn = fn;
-       state->private_data = private_data;
 
 
-       state->im = tevent_create_immediate(state);
-       if (tevent_req_nomem(state->im, req)) {
+       tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
+
+       if (pool == NULL) {
+               tevent_req_error(req, EINVAL);
+               return tevent_req_post(req, ev);
+       }
+       if (pool->pool == NULL) {
+               tevent_req_error(req, EINVAL);
                return tevent_req_post(req, ev);
        }
 
        ret = pthreadpool_tevent_register_ev(pool, ev);
                return tevent_req_post(req, ev);
        }
 
        ret = pthreadpool_tevent_register_ev(pool, ev);
-       if (ret != 0) {
-               tevent_req_error(req, errno);
+       if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
 
                return tevent_req_post(req, ev);
        }
 
-       ret = pthreadpool_add_job(pool->pool, 0,
+       job = talloc_zero(state, struct pthreadpool_tevent_job);
+       if (tevent_req_nomem(job, req)) {
+               return tevent_req_post(req, ev);
+       }
+       job->pool = pool;
+       job->fn = fn;
+       job->private_data = private_data;
+       job->im = tevent_create_immediate(state->job);
+       if (tevent_req_nomem(job->im, req)) {
+               return tevent_req_post(req, ev);
+       }
+       talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
+       DLIST_ADD_END(job->pool->jobs, job);
+       job->state = state;
+       state->job = job;
+
+       ret = pthreadpool_add_job(job->pool->pool, 0,
                                  pthreadpool_tevent_job_fn,
                                  pthreadpool_tevent_job_fn,
-                                 state);
+                                 job);
        if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
 
        if (tevent_req_error(req, ret)) {
                return tevent_req_post(req, ev);
        }
 
-       /*
-        * Once the job is scheduled, we need to protect
-        * our memory.
-        */
-       talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
-
-       DLIST_ADD_END(pool->jobs, state);
-
        return req;
 }
 
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
        return req;
 }
 
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               private_data, struct pthreadpool_tevent_job_state);
-       state->fn(state->private_data);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(private_data,
+               struct pthreadpool_tevent_job);
+
+       job->fn(job->private_data);
 }
 
 static int pthreadpool_tevent_job_signal(int jobid,
 }
 
 static int pthreadpool_tevent_job_signal(int jobid,
@@ -324,18 +464,20 @@ static int pthreadpool_tevent_job_signal(int jobid,
                                         void *job_private_data,
                                         void *private_data)
 {
                                         void *job_private_data,
                                         void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               job_private_data, struct pthreadpool_tevent_job_state);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(job_private_data,
+               struct pthreadpool_tevent_job);
+       struct pthreadpool_tevent_job_state *state = job->state;
        struct tevent_threaded_context *tctx = NULL;
        struct pthreadpool_tevent_glue *g = NULL;
 
        struct tevent_threaded_context *tctx = NULL;
        struct pthreadpool_tevent_glue *g = NULL;
 
-       if (state->pool == NULL) {
-               /* The pthreadpool_tevent is already gone */
+       if (state == NULL) {
+               /* Request already gone */
                return 0;
        }
 
 #ifdef HAVE_PTHREAD
                return 0;
        }
 
 #ifdef HAVE_PTHREAD
-       for (g = state->pool->glue_list; g != NULL; g = g->next) {
+       for (g = job->pool->glue_list; g != NULL; g = g->next) {
                if (g->ev == state->ev) {
                        tctx = g->tctx;
                        break;
                if (g->ev == state->ev) {
                        tctx = g->tctx;
                        break;
@@ -349,14 +491,14 @@ static int pthreadpool_tevent_job_signal(int jobid,
 
        if (tctx != NULL) {
                /* with HAVE_PTHREAD */
 
        if (tctx != NULL) {
                /* with HAVE_PTHREAD */
-               tevent_threaded_schedule_immediate(tctx, state->im,
+               tevent_threaded_schedule_immediate(tctx, job->im,
                                                   pthreadpool_tevent_job_done,
                                                   pthreadpool_tevent_job_done,
-                                                  state);
+                                                  job);
        } else {
                /* without HAVE_PTHREAD */
        } else {
                /* without HAVE_PTHREAD */
-               tevent_schedule_immediate(state->im, state->ev,
+               tevent_schedule_immediate(job->im, state->ev,
                                          pthreadpool_tevent_job_done,
                                          pthreadpool_tevent_job_done,
-                                         state);
+                                         job);
        }
 
        return 0;
        }
 
        return 0;
@@ -366,27 +508,23 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
                                        struct tevent_immediate *im,
                                        void *private_data)
 {
                                        struct tevent_immediate *im,
                                        void *private_data)
 {
-       struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
-               private_data, struct pthreadpool_tevent_job_state);
+       struct pthreadpool_tevent_job *job =
+               talloc_get_type_abort(private_data,
+               struct pthreadpool_tevent_job);
+       struct pthreadpool_tevent_job_state *state = job->state;
 
 
-       if (state->pool != NULL) {
-               DLIST_REMOVE(state->pool->jobs, state);
-               state->pool = NULL;
-       }
+       TALLOC_FREE(job->im);
 
 
-       if (state->req == NULL) {
-               /*
-                * There was a talloc_free() state->req
-                * while the job was pending,
-                * which mean we're reparented on a longterm
-                * talloc context.
-                *
-                * We just cleanup here...
-                */
-               talloc_free(state);
+       if (state == NULL) {
+               /* Request already gone */
+               TALLOC_FREE(job);
                return;
        }
 
                return;
        }
 
+       /*
+        * pthreadpool_tevent_job_cleanup()
+        * will destroy the job.
+        */
        tevent_req_done(state->req);
 }
 
        tevent_req_done(state->req);
 }