Revert "pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure"
authorRalph Boehme <slow@samba.org>
Sun, 23 Dec 2018 08:42:53 +0000 (09:42 +0100)
committerStefan Metzmacher <metze@samba.org>
Fri, 11 Jan 2019 22:11:13 +0000 (23:11 +0100)
This reverts commit f9745d8b5234091c38e93ed57a255120b61f3ad7.

See the discussion in

https://lists.samba.org/archive/samba-technical/2018-December/131731.html

for the reasoning behind this revert.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
lib/pthreadpool/pthreadpool_tevent.c
lib/pthreadpool/pthreadpool_tevent.h

index f88f82d..b0a757a 100644 (file)
@@ -104,8 +104,6 @@ struct pthreadpool_tevent_glue {
        /* Tuple we are keeping track of in this list. */
        struct tevent_context *ev;
        struct tevent_threaded_context *tctx;
-       /* recheck monitor fd event */
-       struct tevent_fd *fde;
        /* Pointer to link object owned by *ev. */
        struct pthreadpool_tevent_glue_ev_link *ev_link;
        /* active jobs */
@@ -125,33 +123,11 @@ struct pthreadpool_tevent_glue_ev_link {
        struct pthreadpool_tevent_glue *glue;
 };
 
-struct pthreadpool_tevent_wrapper {
-       struct pthreadpool_tevent *main_tp;
-       struct pthreadpool_tevent *wrap_tp;
-       const struct pthreadpool_tevent_wrapper_ops *ops;
-       void *private_state;
-       bool force_per_thread_cwd;
-};
-
 struct pthreadpool_tevent {
-       struct pthreadpool_tevent *prev, *next;
-
        struct pthreadpool *pool;
        struct pthreadpool_tevent_glue *glue_list;
 
        struct pthreadpool_tevent_job *jobs;
-
-       struct {
-               /*
-                * This is used on the main context
-                */
-               struct pthreadpool_tevent *list;
-
-               /*
-                * This is used on the wrapper context
-                */
-               struct pthreadpool_tevent_wrapper *ctx;
-       } wrapper;
 };
 
 struct pthreadpool_tevent_job_state {
@@ -166,7 +142,6 @@ struct pthreadpool_tevent_job {
        struct pthreadpool_tevent_job *prev, *next;
 
        struct pthreadpool_tevent *pool;
-       struct pthreadpool_tevent_wrapper *wrapper;
        struct pthreadpool_tevent_job_state *state;
        struct tevent_immediate *im;
 
@@ -206,15 +181,6 @@ struct pthreadpool_tevent_job {
                 */
                bool started;
 
-               /*
-                * 'wrapper'
-                * set before calling the wrapper before_job() or
-                * after_job() hooks.
-                * unset again check the hook finished.
-                * (only written by job thread!)
-                */
-               bool wrapper;
-
                /*
                 * 'executed'
                 * set once the job function returned.
@@ -244,18 +210,6 @@ struct pthreadpool_tevent_job {
                 * (only written by job thread!)
                 */
                bool signaled;
-
-               /*
-                * 'exit_thread'
-                * maybe set during pthreadpool_tevent_job_fn()
-                * if some wrapper related code generated an error
-                * and the environment isn't safe anymore.
-                *
-                * In such a case pthreadpool_tevent_job_signal()
-                * will pick this up and therminate the current
-                * worker thread by returning -1.
-                */
-               bool exit_thread; /* only written/read by job thread! */
        } needs_fence;
 
        bool per_thread_cwd;
@@ -314,22 +268,8 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
        return 0;
 }
 
-static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
-       struct pthreadpool_tevent *pool)
-{
-       struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
-       if (wrapper != NULL) {
-               return wrapper->main_tp;
-       }
-
-       return pool;
-}
-
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 {
-       pool = pthreadpool_tevent_unwrap(pool);
-
        if (pool->pool == NULL) {
                return 0;
        }
@@ -339,8 +279,6 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 {
-       pool = pthreadpool_tevent_unwrap(pool);
-
        if (pool->pool == NULL) {
                return 0;
        }
@@ -350,14 +288,6 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
 {
-       struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
-       if (wrapper != NULL && wrapper->force_per_thread_cwd) {
-               return true;
-       }
-
-       pool = pthreadpool_tevent_unwrap(pool);
-
        if (pool->pool == NULL) {
                return false;
        }
@@ -369,94 +299,21 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
        struct pthreadpool_tevent_job *job = NULL;
        struct pthreadpool_tevent_job *njob = NULL;
-       struct pthreadpool_tevent *wrap_tp = NULL;
-       struct pthreadpool_tevent *nwrap_tp = NULL;
        struct pthreadpool_tevent_glue *glue = NULL;
        int ret;
 
-       if (pool->wrapper.ctx != NULL) {
-               struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
-               pool->wrapper.ctx = NULL;
-               pool = wrapper->main_tp;
-
-               DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
-
-               for (job = pool->jobs; job != NULL; job = njob) {
-                       njob = job->next;
-
-                       if (job->wrapper != wrapper) {
-                               continue;
-                       }
-
-                       /*
-                        * This removes the job from the list
-                        *
-                        * Note that it waits in case
-                        * the wrapper hooks are currently
-                        * executing on the job.
-                        */
-                       pthreadpool_tevent_job_orphan(job);
-               }
-
-               /*
-                * At this point we're sure that no job
-                * still references the pthreadpool_tevent_wrapper
-                * structure, so we can free it.
-                */
-               TALLOC_FREE(wrapper);
-
-               pthreadpool_tevent_cleanup_orphaned_jobs();
-               return 0;
-       }
-
-       if (pool->pool == NULL) {
-               /*
-                * A dangling wrapper without main_tp.
-                */
-               return 0;
-       }
-
        ret = pthreadpool_stop(pool->pool);
        if (ret != 0) {
                return ret;
        }
 
-       /*
-        * orphan all jobs (including wrapper jobs)
-        */
        for (job = pool->jobs; job != NULL; job = njob) {
                njob = job->next;
 
-               /*
-                * The job this removes it from the list
-                *
-                * Note that it waits in case
-                * the wrapper hooks are currently
-                * executing on the job (thread).
-                */
+               /* The job this removes it from the list */
                pthreadpool_tevent_job_orphan(job);
        }
 
-       /*
-        * cleanup all existing wrappers, remember we just orphaned
-        * all jobs (including the once of the wrappers).
-        *
-        * So we just mark as broken, so that
-        * pthreadpool_tevent_job_send() won't accept new jobs.
-        */
-       for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
-               nwrap_tp = wrap_tp->next;
-
-               /*
-                * Just mark them as broken, so that we can't
-                * get more jobs.
-                */
-               TALLOC_FREE(wrap_tp->wrapper.ctx);
-
-               DLIST_REMOVE(pool->wrapper.list, wrap_tp);
-       }
-
        /*
         * Delete all the registered
         * tevent_context/tevent_threaded_context
@@ -479,93 +336,12 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
        return 0;
 }
 
-struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
-                               struct pthreadpool_tevent *main_tp,
-                               TALLOC_CTX *mem_ctx,
-                               const struct pthreadpool_tevent_wrapper_ops *ops,
-                               void *pstate,
-                               size_t psize,
-                               const char *type,
-                               const char *location)
-{
-       void **ppstate = (void **)pstate;
-       struct pthreadpool_tevent *wrap_tp = NULL;
-       struct pthreadpool_tevent_wrapper *wrapper = NULL;
-
-       pthreadpool_tevent_cleanup_orphaned_jobs();
-
-       if (main_tp->wrapper.ctx != NULL) {
-               /*
-                * stacking of wrappers is not supported
-                */
-               errno = EINVAL;
-               return NULL;
-       }
-
-       if (main_tp->pool == NULL) {
-               /*
-                * The pool is no longer valid,
-                * most likely it was a wrapper context
-                * where the main pool was destroyed.
-                */
-               errno = EINVAL;
-               return NULL;
-       }
-
-       wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
-       if (wrap_tp == NULL) {
-               return NULL;
-       }
-
-       wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
-       if (wrapper == NULL) {
-               TALLOC_FREE(wrap_tp);
-               return NULL;
-       }
-       wrapper->main_tp = main_tp;
-       wrapper->wrap_tp = wrap_tp;
-       wrapper->ops = ops;
-       wrapper->private_state = talloc_zero_size(wrapper, psize);
-       if (wrapper->private_state == NULL) {
-               TALLOC_FREE(wrap_tp);
-               return NULL;
-       }
-       talloc_set_name_const(wrapper->private_state, type);
-
-       wrap_tp->wrapper.ctx = wrapper;
-
-       DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
-
-       talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
-
-       *ppstate = wrapper->private_state;
-       return wrap_tp;
-}
-
-void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
-                                            const void *private_state)
-{
-       struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
-       if (wrapper == NULL) {
-               abort();
-       }
-
-       if (wrapper->private_state != private_state) {
-               abort();
-       }
-
-       wrapper->force_per_thread_cwd = true;
-}
-
 static int pthreadpool_tevent_glue_destructor(
        struct pthreadpool_tevent_glue *glue)
 {
        struct pthreadpool_tevent_job_state *state = NULL;
        struct pthreadpool_tevent_job_state *nstate = NULL;
 
-       TALLOC_FREE(glue->fde);
-
        for (state = glue->states; state != NULL; state = nstate) {
                nstate = state->next;
 
@@ -606,59 +382,6 @@ static int pthreadpool_tevent_glue_link_destructor(
        return 0;
 }
 
-static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
-                                           struct tevent_fd *fde,
-                                           uint16_t flags,
-                                           void *private_data)
-{
-       struct pthreadpool_tevent_glue *glue =
-               talloc_get_type_abort(private_data,
-               struct pthreadpool_tevent_glue);
-       struct pthreadpool_tevent_job *job = NULL;
-       struct pthreadpool_tevent_job *njob = NULL;
-       int ret = -1;
-
-       ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
-       if (ret != 0) {
-               TALLOC_FREE(glue->fde);
-       }
-
-       ret = pthreadpool_restart_check(glue->pool->pool);
-       if (ret == 0) {
-               /*
-                * success...
-                */
-               goto done;
-       }
-
-       /*
-        * There's a problem and the pool
-        * has not a single thread available
-        * for pending jobs, so we can only
-        * stop the jobs and return an error.
-        * This is similar to a failure from
-        * pthreadpool_add_job().
-        */
-       for (job = glue->pool->jobs; job != NULL; job = njob) {
-               njob = job->next;
-
-               tevent_req_defer_callback(job->state->req,
-                                         job->state->ev);
-               tevent_req_error(job->state->req, ret);
-       }
-
-done:
-       if (glue->states == NULL) {
-               /*
-                * If the glue doesn't have any pending jobs
-                * we remove the glue.
-                *
-                * In order to remove the fd event.
-                */
-               TALLOC_FREE(glue);
-       }
-}
-
 static int pthreadpool_tevent_register_ev(
                                struct pthreadpool_tevent *pool,
                                struct pthreadpool_tevent_job_state *state)
@@ -666,7 +389,6 @@ static int pthreadpool_tevent_register_ev(
        struct tevent_context *ev = state->ev;
        struct pthreadpool_tevent_glue *glue = NULL;
        struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
-       int monitor_fd = -1;
 
        /*
         * See if this tevent_context was already registered by
@@ -699,28 +421,6 @@ static int pthreadpool_tevent_register_ev(
        };
        talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
 
-       monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
-       if (monitor_fd == -1 && errno != ENOSYS) {
-               int saved_errno = errno;
-               TALLOC_FREE(glue);
-               return saved_errno;
-       }
-
-       if (monitor_fd != -1) {
-               glue->fde = tevent_add_fd(ev,
-                                         glue,
-                                         monitor_fd,
-                                         TEVENT_FD_READ,
-                                         pthreadpool_tevent_glue_monitor,
-                                         glue);
-               if (glue->fde == NULL) {
-                       close(monitor_fd);
-                       TALLOC_FREE(glue);
-                       return ENOMEM;
-               }
-               tevent_fd_set_auto_close(glue->fde);
-       }
-
        /*
         * Now allocate the link object to the event context. Note this
         * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@@ -860,24 +560,6 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
                abort();
        }
 
-       /*
-        * Once we marked the request as 'orphaned'
-        * we spin/loop if 'wrapper' is marked as active.
-        *
-        * We need to wait until the wrapper hook finished
-        * before we can set job->wrapper = NULL.
-        *
-        * This is some kind of spinlock, but with
-        * 1 millisecond sleeps in between, in order
-        * to give the thread more cpu time to finish.
-        */
-       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-       while (job->needs_fence.wrapper) {
-               poll(NULL, 0, 1);
-               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-       }
-       job->wrapper = NULL;
-
        /*
         * Once we marked the request as 'orphaned'
         * we spin/loop if it's already marked
@@ -992,14 +674,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
        struct pthreadpool_tevent_job_state *state = NULL;
        struct pthreadpool_tevent_job *job = NULL;
        int ret;
-       struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
 
        pthreadpool_tevent_cleanup_orphaned_jobs();
 
-       if (wrapper != NULL) {
-               pool = wrapper->main_tp;
-       }
-
        req = tevent_req_create(mem_ctx, &state,
                                struct pthreadpool_tevent_job_state);
        if (req == NULL) {
@@ -1029,7 +706,6 @@ struct tevent_req *pthreadpool_tevent_job_send(
                return tevent_req_post(req, ev);
        }
        job->pool = pool;
-       job->wrapper = wrapper;
        job->fn = fn;
        job->private_data = private_data;
        job->im = tevent_create_immediate(state->job);
@@ -1128,73 +804,15 @@ static void pthreadpool_tevent_job_fn(void *private_data)
        struct pthreadpool_tevent_job *job =
                talloc_get_type_abort(private_data,
                struct pthreadpool_tevent_job);
-       struct pthreadpool_tevent_wrapper *wrapper = NULL;
 
        current_job = job;
        job->needs_fence.started = true;
        PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-       if (job->needs_fence.orphaned) {
-               current_job = NULL;
-               return;
-       }
-
-       wrapper = job->wrapper;
-       if (wrapper != NULL) {
-               bool ok;
-
-               job->needs_fence.wrapper = true;
-               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-               if (job->needs_fence.orphaned) {
-                       job->needs_fence.wrapper = false;
-                       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-                       current_job = NULL;
-                       return;
-               }
-               ok = wrapper->ops->before_job(wrapper->wrap_tp,
-                                             wrapper->private_state,
-                                             wrapper->main_tp,
-                                             __location__);
-               job->needs_fence.wrapper = false;
-               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-               if (!ok) {
-                       job->needs_fence.exit_thread = true;
-                       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-                       current_job = NULL;
-                       return;
-               }
-       }
 
        job->fn(job->private_data);
 
        job->needs_fence.executed = true;
        PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-
-       if (wrapper != NULL) {
-               bool ok;
-
-               job->needs_fence.wrapper = true;
-               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-               if (job->needs_fence.orphaned) {
-                       job->needs_fence.wrapper = false;
-                       job->needs_fence.exit_thread = true;
-                       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-                       current_job = NULL;
-                       return;
-               }
-               ok = wrapper->ops->after_job(wrapper->wrap_tp,
-                                            wrapper->private_state,
-                                            wrapper->main_tp,
-                                            __location__);
-               job->needs_fence.wrapper = false;
-               PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-               if (!ok) {
-                       job->needs_fence.exit_thread = true;
-                       PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-                       current_job = NULL;
-                       return;
-               }
-       }
-
        current_job = NULL;
 }
 
@@ -1213,15 +831,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
                /* Request already gone */
                job->needs_fence.dropped = true;
                PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-               if (job->needs_fence.exit_thread) {
-                       /*
-                        * A problem with the wrapper the current job/worker
-                        * thread needs to terminate.
-                        *
-                        * The pthreadpool_tevent is already gone.
-                        */
-                       return -1;
-               }
                return 0;
        }
 
@@ -1247,15 +856,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
 
        job->needs_fence.signaled = true;
        PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-       if (job->needs_fence.exit_thread) {
-               /*
-                * A problem with the wrapper the current job/worker
-                * thread needs to terminate.
-                *
-                * The pthreadpool_tevent is already gone.
-                */
-               return -1;
-       }
        return 0;
 }
 
index 6c939fc..ff2ab7c 100644 (file)
@@ -29,38 +29,6 @@ struct pthreadpool_tevent;
 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
                            struct pthreadpool_tevent **presult);
 
-struct pthreadpool_tevent_wrapper_ops {
-       const char *name;
-
-       bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
-                          void *private_state,
-                          struct pthreadpool_tevent *main_tp,
-                          const char *location);
-       bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
-                         void *private_state,
-                         struct pthreadpool_tevent *main_tp,
-                         const char *location);
-};
-
-struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
-                               struct pthreadpool_tevent *main_tp,
-                               TALLOC_CTX *mem_ctx,
-                               const struct pthreadpool_tevent_wrapper_ops *ops,
-                               void *pstate,
-                               size_t psize,
-                               const char *type,
-                               const char *location);
-#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
-       _pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
-                                      state, sizeof(type), #type, __location__)
-
-/*
- * this can only be called directly after
- * pthreadpool_tevent_wrapper_create()
- */
-void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
-                                            const void *private_state);
-
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);