pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "pthreadpool_tevent.h"
24 #include "pthreadpool.h"
25 #include "lib/util/tevent_unix.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/util/attr.h"
28
29 /*
30  * We try to give some hints to helgrind/drd
31  *
32  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
33  * takes an memory address range that ignored by helgrind/drd
34  * 'description' is just ignored...
35  *
36  *
37  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
38  * just takes a DWORD/(void *) as unique key
39  * for the barrier.
40  */
41 #ifdef HAVE_VALGRIND_HELGRIND_H
42 #include <valgrind/helgrind.h>
43 #endif
44 #ifndef ANNOTATE_BENIGN_RACE_SIZED
45 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
46 #endif
47 #ifndef ANNOTATE_HAPPENS_BEFORE
48 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
49 #endif
50 #ifndef ANNOTATE_HAPPENS_AFTER
51 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
52 #endif
53 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
54 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
55 #endif
56
57 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
58         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
59         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
60                                    sizeof(__j->needs_fence), \
61                                    "race by design, protected by fence"); \
62 } while(0);
63
64 #ifdef WITH_PTHREADPOOL
65 /*
66  * configure checked we have pthread and atomic_thread_fence() available
67  */
68 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
69         atomic_thread_fence(__order); \
70 } while(0)
71 #else
72 /*
73  * we're using lib/pthreadpool/pthreadpool_sync.c ...
74  */
75 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
76 #ifndef HAVE___THREAD
77 #define __thread
78 #endif
79 #endif
80
81 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
82         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
83         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
84         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
85         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
86 } while(0);
87
88 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
89         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
90         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
91 } while(0);
92
93 struct pthreadpool_tevent_job_state;
94
95 /*
96  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
97  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
98  * contexts in a pthreadpool_tevent.
99  */
100 struct pthreadpool_tevent_glue {
101         struct pthreadpool_tevent_glue *prev, *next;
102         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
103         /* Tuple we are keeping track of in this list. */
104         struct tevent_context *ev;
105         struct tevent_threaded_context *tctx;
106         /* Pointer to link object owned by *ev. */
107         struct pthreadpool_tevent_glue_ev_link *ev_link;
108         /* active jobs */
109         struct pthreadpool_tevent_job_state *states;
110 };
111
112 /*
113  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
114  * tevent context from our list of active event contexts if the event context
115  * is destroyed.
116  * This structure is talloc()'ed from the struct tevent_context *, and is a
117  * back-pointer allowing the related struct pthreadpool_tevent_glue object
118  * to be removed from the struct pthreadpool_tevent glue list if the owning
119  * tevent_context is talloc_free()'ed.
120  */
121 struct pthreadpool_tevent_glue_ev_link {
122         struct pthreadpool_tevent_glue *glue;
123 };
124
125 struct pthreadpool_tevent {
126         struct pthreadpool *pool;
127         struct pthreadpool_tevent_glue *glue_list;
128
129         struct pthreadpool_tevent_job *jobs;
130 };
131
132 struct pthreadpool_tevent_job_state {
133         struct pthreadpool_tevent_job_state *prev, *next;
134         struct pthreadpool_tevent_glue *glue;
135         struct tevent_context *ev;
136         struct tevent_req *req;
137         struct pthreadpool_tevent_job *job;
138 };
139
140 struct pthreadpool_tevent_job {
141         struct pthreadpool_tevent_job *prev, *next;
142
143         struct pthreadpool_tevent *pool;
144         struct pthreadpool_tevent_job_state *state;
145         struct tevent_immediate *im;
146
147         void (*fn)(void *private_data);
148         void *private_data;
149
150         /*
151          * Coordination between threads
152          *
153          * There're only one side writing each element
154          * either the main process or the job thread.
155          *
156          * The coordination is done by a full memory
157          * barrier using atomic_thread_fence(memory_order_seq_cst)
158          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
159          */
160         struct {
161                 /*
162                  * 'maycancel'
163                  * set when tevent_req_cancel() is called.
164                  * (only written by main thread!)
165                  */
166                 bool maycancel;
167
168                 /*
169                  * 'orphaned'
170                  * set when talloc_free is called on the job request,
171                  * tevent_context or pthreadpool_tevent.
172                  * (only written by main thread!)
173                  */
174                 bool orphaned;
175
176                 /*
177                  * 'started'
178                  * set when the job is picked up by a worker thread
179                  * (only written by job thread!)
180                  */
181                 bool started;
182
183                 /*
184                  * 'executed'
185                  * set once the job function returned.
186                  * (only written by job thread!)
187                  */
188                 bool executed;
189
190                 /*
191                  * 'finished'
192                  * set when pthreadpool_tevent_job_signal() is entered
193                  * (only written by job thread!)
194                  */
195                 bool finished;
196
197                 /*
198                  * 'dropped'
199                  * set when pthreadpool_tevent_job_signal() leaves with
200                  * orphaned already set.
201                  * (only written by job thread!)
202                  */
203                 bool dropped;
204
205                 /*
206                  * 'signaled'
207                  * set when pthreadpool_tevent_job_signal() leaves normal
208                  * and the immediate event was scheduled.
209                  * (only written by job thread!)
210                  */
211                 bool signaled;
212         } needs_fence;
213 };
214
215 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
216
217 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
218
219 static struct pthreadpool_tevent_job *orphaned_jobs;
220
221 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
222 {
223         struct pthreadpool_tevent_job *job = NULL;
224         struct pthreadpool_tevent_job *njob = NULL;
225
226         for (job = orphaned_jobs; job != NULL; job = njob) {
227                 njob = job->next;
228
229                 /*
230                  * The job destructor keeps the job alive
231                  * (and in the list) or removes it from the list.
232                  */
233                 TALLOC_FREE(job);
234         }
235 }
236
237 static int pthreadpool_tevent_job_signal(int jobid,
238                                          void (*job_fn)(void *private_data),
239                                          void *job_private_data,
240                                          void *private_data);
241
242 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
243                             struct pthreadpool_tevent **presult)
244 {
245         struct pthreadpool_tevent *pool;
246         int ret;
247
248         pthreadpool_tevent_cleanup_orphaned_jobs();
249
250         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
251         if (pool == NULL) {
252                 return ENOMEM;
253         }
254
255         ret = pthreadpool_init(max_threads, &pool->pool,
256                                pthreadpool_tevent_job_signal, pool);
257         if (ret != 0) {
258                 TALLOC_FREE(pool);
259                 return ret;
260         }
261
262         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
263
264         *presult = pool;
265         return 0;
266 }
267
268 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
269 {
270         if (pool->pool == NULL) {
271                 return 0;
272         }
273
274         return pthreadpool_max_threads(pool->pool);
275 }
276
277 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
278 {
279         if (pool->pool == NULL) {
280                 return 0;
281         }
282
283         return pthreadpool_queued_jobs(pool->pool);
284 }
285
286 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
287 {
288         struct pthreadpool_tevent_job *job = NULL;
289         struct pthreadpool_tevent_job *njob = NULL;
290         struct pthreadpool_tevent_glue *glue = NULL;
291         int ret;
292
293         ret = pthreadpool_stop(pool->pool);
294         if (ret != 0) {
295                 return ret;
296         }
297
298         for (job = pool->jobs; job != NULL; job = njob) {
299                 njob = job->next;
300
301                 /* The job this removes it from the list */
302                 pthreadpool_tevent_job_orphan(job);
303         }
304
305         /*
306          * Delete all the registered
307          * tevent_context/tevent_threaded_context
308          * pairs.
309          */
310         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
311                 /* The glue destructor removes it from the list */
312                 TALLOC_FREE(glue);
313         }
314         pool->glue_list = NULL;
315
316         ret = pthreadpool_destroy(pool->pool);
317         if (ret != 0) {
318                 return ret;
319         }
320         pool->pool = NULL;
321
322         pthreadpool_tevent_cleanup_orphaned_jobs();
323
324         return 0;
325 }
326
327 static int pthreadpool_tevent_glue_destructor(
328         struct pthreadpool_tevent_glue *glue)
329 {
330         struct pthreadpool_tevent_job_state *state = NULL;
331         struct pthreadpool_tevent_job_state *nstate = NULL;
332
333         for (state = glue->states; state != NULL; state = nstate) {
334                 nstate = state->next;
335
336                 /* The job this removes it from the list */
337                 pthreadpool_tevent_job_orphan(state->job);
338         }
339
340         if (glue->pool->glue_list != NULL) {
341                 DLIST_REMOVE(glue->pool->glue_list, glue);
342         }
343
344         /* Ensure the ev_link destructor knows we're gone */
345         glue->ev_link->glue = NULL;
346
347         TALLOC_FREE(glue->ev_link);
348         TALLOC_FREE(glue->tctx);
349
350         return 0;
351 }
352
353 /*
354  * Destructor called either explicitly from
355  * pthreadpool_tevent_glue_destructor(), or indirectly
356  * when owning tevent_context is destroyed.
357  *
358  * When called from pthreadpool_tevent_glue_destructor()
359  * ev_link->glue is already NULL, so this does nothing.
360  *
361  * When called from talloc_free() of the owning
362  * tevent_context we must ensure we also remove the
363  * linked glue object from the list inside
364  * struct pthreadpool_tevent.
365  */
366 static int pthreadpool_tevent_glue_link_destructor(
367         struct pthreadpool_tevent_glue_ev_link *ev_link)
368 {
369         TALLOC_FREE(ev_link->glue);
370         return 0;
371 }
372
373 static int pthreadpool_tevent_register_ev(
374                                 struct pthreadpool_tevent *pool,
375                                 struct pthreadpool_tevent_job_state *state)
376 {
377         struct tevent_context *ev = state->ev;
378         struct pthreadpool_tevent_glue *glue = NULL;
379         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
380
381         /*
382          * See if this tevent_context was already registered by
383          * searching the glue object list. If so we have nothing
384          * to do here - we already have a tevent_context/tevent_threaded_context
385          * pair.
386          */
387         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
388                 if (glue->ev == state->ev) {
389                         state->glue = glue;
390                         DLIST_ADD_END(glue->states, state);
391                         return 0;
392                 }
393         }
394
395         /*
396          * Event context not yet registered - create a new glue
397          * object containing a tevent_context/tevent_threaded_context
398          * pair and put it on the list to remember this registration.
399          * We also need a link object to ensure the event context
400          * can't go away without us knowing about it.
401          */
402         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
403         if (glue == NULL) {
404                 return ENOMEM;
405         }
406         *glue = (struct pthreadpool_tevent_glue) {
407                 .pool = pool,
408                 .ev = ev,
409         };
410         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
411
412         /*
413          * Now allocate the link object to the event context. Note this
414          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
415          * context is freed we are able to cleanup the glue object
416          * in the link object destructor.
417          */
418
419         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
420         if (ev_link == NULL) {
421                 TALLOC_FREE(glue);
422                 return ENOMEM;
423         }
424         ev_link->glue = glue;
425         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
426
427         glue->ev_link = ev_link;
428
429 #ifdef HAVE_PTHREAD
430         glue->tctx = tevent_threaded_context_create(glue, ev);
431         if (glue->tctx == NULL) {
432                 TALLOC_FREE(ev_link);
433                 TALLOC_FREE(glue);
434                 return ENOMEM;
435         }
436 #endif
437
438         state->glue = glue;
439         DLIST_ADD_END(glue->states, state);
440
441         DLIST_ADD(pool->glue_list, glue);
442         return 0;
443 }
444
445 static void pthreadpool_tevent_job_fn(void *private_data);
446 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
447                                         struct tevent_immediate *im,
448                                         void *private_data);
449 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
450
451 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
452 {
453         /*
454          * We should never be called with needs_fence.orphaned == false.
455          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
456          * after detaching from the request state, glue and pool list.
457          */
458         if (!job->needs_fence.orphaned) {
459                 abort();
460         }
461
462         /*
463          * If the job is not finished (job->im still there)
464          * and it's still attached to the pool,
465          * we try to cancel it (before it was starts)
466          */
467         if (job->im != NULL && job->pool != NULL) {
468                 size_t num;
469
470                 num = pthreadpool_cancel_job(job->pool->pool, 0,
471                                              pthreadpool_tevent_job_fn,
472                                              job);
473                 if (num != 0) {
474                         /*
475                          * It was not too late to cancel the request.
476                          *
477                          * We can remove job->im, as it will never be used.
478                          */
479                         TALLOC_FREE(job->im);
480                 }
481         }
482
483         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
484         if (job->needs_fence.dropped) {
485                 /*
486                  * The signal function saw job->needs_fence.orphaned
487                  * before it started the signaling via the immediate
488                  * event. So we'll never geht triggered and can
489                  * remove job->im and let the whole job go...
490                  */
491                 TALLOC_FREE(job->im);
492         }
493
494         /*
495          * pthreadpool_tevent_job_orphan() already removed
496          * it from pool->jobs. And we don't need try
497          * pthreadpool_cancel_job() again.
498          */
499         job->pool = NULL;
500
501         if (job->im != NULL) {
502                 /*
503                  * state->im still there means, we need to wait for the
504                  * immediate event to be triggered or just leak the memory.
505                  *
506                  * Move it to the orphaned list, if it's not already there.
507                  */
508                 return -1;
509         }
510
511         /*
512          * Finally remove from the orphaned_jobs list
513          * and let talloc destroy us.
514          */
515         DLIST_REMOVE(orphaned_jobs, job);
516
517         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
518         return 0;
519 }
520
521 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
522 {
523         job->needs_fence.orphaned = true;
524         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
525
526         /*
527          * We're the only function that sets
528          * job->state = NULL;
529          */
530         if (job->state == NULL) {
531                 abort();
532         }
533
534         /*
535          * Once we marked the request as 'orphaned'
536          * we spin/loop if it's already marked
537          * as 'finished' (which means that
538          * pthreadpool_tevent_job_signal() was entered.
539          * If it saw 'orphaned' it will exit after setting
540          * 'dropped', otherwise it dereferences
541          * job->state->glue->{tctx,ev} until it exited
542          * after setting 'signaled'.
543          *
544          * We need to close this potential gab before
545          * we can set job->state = NULL.
546          *
547          * This is some kind of spinlock, but with
548          * 1 millisecond sleeps in between, in order
549          * to give the thread more cpu time to finish.
550          */
551         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
552         while (job->needs_fence.finished) {
553                 if (job->needs_fence.dropped) {
554                         break;
555                 }
556                 if (job->needs_fence.signaled) {
557                         break;
558                 }
559                 poll(NULL, 0, 1);
560                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
561         }
562
563         /*
564          * Once the gab is closed, we can remove
565          * the glue link.
566          */
567         DLIST_REMOVE(job->state->glue->states, job->state);
568         job->state->glue = NULL;
569
570         /*
571          * We need to reparent to a long term context.
572          * And detach from the request state.
573          * Maybe the destructor will keep the memory
574          * and leak it for now.
575          */
576         (void)talloc_reparent(job->state, NULL, job);
577         job->state->job = NULL;
578         job->state = NULL;
579
580         /*
581          * job->pool will only be set to NULL
582          * in the first destructur run.
583          */
584         if (job->pool == NULL) {
585                 abort();
586         }
587
588         /*
589          * Dettach it from the pool.
590          *
591          * The job might still be running,
592          * so we keep job->pool.
593          * The destructor will set it to NULL
594          * after trying pthreadpool_cancel_job()
595          */
596         DLIST_REMOVE(job->pool->jobs, job);
597
598         /*
599          * Add it to the list of orphaned jobs,
600          * which may be cleaned up later.
601          *
602          * The destructor removes it from the list
603          * when possible or it denies the free
604          * and keep it in the list.
605          */
606         DLIST_ADD_END(orphaned_jobs, job);
607         TALLOC_FREE(job);
608 }
609
610 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
611                                            enum tevent_req_state req_state)
612 {
613         struct pthreadpool_tevent_job_state *state =
614                 tevent_req_data(req,
615                 struct pthreadpool_tevent_job_state);
616
617         if (state->job == NULL) {
618                 /*
619                  * The job request is not scheduled in the pool
620                  * yet or anymore.
621                  */
622                 if (state->glue != NULL) {
623                         DLIST_REMOVE(state->glue->states, state);
624                         state->glue = NULL;
625                 }
626                 return;
627         }
628
629         /*
630          * We need to reparent to a long term context.
631          * Maybe the destructor will keep the memory
632          * and leak it for now.
633          */
634         pthreadpool_tevent_job_orphan(state->job);
635         state->job = NULL; /* not needed but looks better */
636         return;
637 }
638
639 struct tevent_req *pthreadpool_tevent_job_send(
640         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
641         struct pthreadpool_tevent *pool,
642         void (*fn)(void *private_data), void *private_data)
643 {
644         struct tevent_req *req = NULL;
645         struct pthreadpool_tevent_job_state *state = NULL;
646         struct pthreadpool_tevent_job *job = NULL;
647         int ret;
648
649         pthreadpool_tevent_cleanup_orphaned_jobs();
650
651         req = tevent_req_create(mem_ctx, &state,
652                                 struct pthreadpool_tevent_job_state);
653         if (req == NULL) {
654                 return NULL;
655         }
656         state->ev = ev;
657         state->req = req;
658
659         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
660
661         if (pool == NULL) {
662                 tevent_req_error(req, EINVAL);
663                 return tevent_req_post(req, ev);
664         }
665         if (pool->pool == NULL) {
666                 tevent_req_error(req, EINVAL);
667                 return tevent_req_post(req, ev);
668         }
669
670         ret = pthreadpool_tevent_register_ev(pool, state);
671         if (tevent_req_error(req, ret)) {
672                 return tevent_req_post(req, ev);
673         }
674
675         job = talloc_zero(state, struct pthreadpool_tevent_job);
676         if (tevent_req_nomem(job, req)) {
677                 return tevent_req_post(req, ev);
678         }
679         job->pool = pool;
680         job->fn = fn;
681         job->private_data = private_data;
682         job->im = tevent_create_immediate(state->job);
683         if (tevent_req_nomem(job->im, req)) {
684                 return tevent_req_post(req, ev);
685         }
686         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
687         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
688         DLIST_ADD_END(job->pool->jobs, job);
689         job->state = state;
690         state->job = job;
691
692         ret = pthreadpool_add_job(job->pool->pool, 0,
693                                   pthreadpool_tevent_job_fn,
694                                   job);
695         if (tevent_req_error(req, ret)) {
696                 return tevent_req_post(req, ev);
697         }
698
699         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
700         return req;
701 }
702
703 static __thread struct pthreadpool_tevent_job *current_job;
704
705 bool pthreadpool_tevent_current_job_canceled(void)
706 {
707         if (current_job == NULL) {
708                 /*
709                  * Should only be called from within
710                  * the job function.
711                  */
712                 abort();
713                 return false;
714         }
715
716         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
717         return current_job->needs_fence.maycancel;
718 }
719
720 bool pthreadpool_tevent_current_job_orphaned(void)
721 {
722         if (current_job == NULL) {
723                 /*
724                  * Should only be called from within
725                  * the job function.
726                  */
727                 abort();
728                 return false;
729         }
730
731         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
732         return current_job->needs_fence.orphaned;
733 }
734
735 bool pthreadpool_tevent_current_job_continue(void)
736 {
737         if (current_job == NULL) {
738                 /*
739                  * Should only be called from within
740                  * the job function.
741                  */
742                 abort();
743                 return false;
744         }
745
746         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
747         if (current_job->needs_fence.maycancel) {
748                 return false;
749         }
750         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
751         if (current_job->needs_fence.orphaned) {
752                 return false;
753         }
754
755         return true;
756 }
757
758 static void pthreadpool_tevent_job_fn(void *private_data)
759 {
760         struct pthreadpool_tevent_job *job =
761                 talloc_get_type_abort(private_data,
762                 struct pthreadpool_tevent_job);
763
764         current_job = job;
765         job->needs_fence.started = true;
766         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
767
768         job->fn(job->private_data);
769
770         job->needs_fence.executed = true;
771         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
772         current_job = NULL;
773 }
774
775 static int pthreadpool_tevent_job_signal(int jobid,
776                                          void (*job_fn)(void *private_data),
777                                          void *job_private_data,
778                                          void *private_data)
779 {
780         struct pthreadpool_tevent_job *job =
781                 talloc_get_type_abort(job_private_data,
782                 struct pthreadpool_tevent_job);
783
784         job->needs_fence.finished = true;
785         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
786         if (job->needs_fence.orphaned) {
787                 /* Request already gone */
788                 job->needs_fence.dropped = true;
789                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
790                 return 0;
791         }
792
793         /*
794          * state and state->glue are valid,
795          * see the job->needs_fence.finished
796          * "spinlock" loop in
797          * pthreadpool_tevent_job_orphan()
798          */
799         if (job->state->glue->tctx != NULL) {
800                 /* with HAVE_PTHREAD */
801                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
802                                                    job->im,
803                                                    pthreadpool_tevent_job_done,
804                                                    job);
805         } else {
806                 /* without HAVE_PTHREAD */
807                 tevent_schedule_immediate(job->im,
808                                           job->state->glue->ev,
809                                           pthreadpool_tevent_job_done,
810                                           job);
811         }
812
813         job->needs_fence.signaled = true;
814         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
815         return 0;
816 }
817
818 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
819                                         struct tevent_immediate *im,
820                                         void *private_data)
821 {
822         struct pthreadpool_tevent_job *job =
823                 talloc_get_type_abort(private_data,
824                 struct pthreadpool_tevent_job);
825         struct pthreadpool_tevent_job_state *state = job->state;
826
827         TALLOC_FREE(job->im);
828
829         if (state == NULL) {
830                 /* Request already gone */
831                 TALLOC_FREE(job);
832                 return;
833         }
834
835         /*
836          * pthreadpool_tevent_job_cleanup()
837          * (called by tevent_req_done() or
838          * tevent_req_error()) will destroy the job.
839          */
840
841         if (job->needs_fence.executed) {
842                 tevent_req_done(state->req);
843                 return;
844         }
845
846         tevent_req_error(state->req, ENOEXEC);
847         return;
848 }
849
850 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
851 {
852         struct pthreadpool_tevent_job_state *state =
853                 tevent_req_data(req,
854                 struct pthreadpool_tevent_job_state);
855         struct pthreadpool_tevent_job *job = state->job;
856         size_t num;
857
858         if (job == NULL) {
859                 return false;
860         }
861
862         job->needs_fence.maycancel = true;
863         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
864         if (job->needs_fence.started) {
865                 /*
866                  * It was too late to cancel the request.
867                  *
868                  * The job still has the chance to look
869                  * at pthreadpool_tevent_current_job_canceled()
870                  * or pthreadpool_tevent_current_job_continue()
871                  */
872                 return false;
873         }
874
875         num = pthreadpool_cancel_job(job->pool->pool, 0,
876                                      pthreadpool_tevent_job_fn,
877                                      job);
878         if (num == 0) {
879                 /*
880                  * It was too late to cancel the request.
881                  */
882                 return false;
883         }
884
885         /*
886          * It was not too late to cancel the request.
887          *
888          * We can remove job->im, as it will never be used.
889          */
890         TALLOC_FREE(job->im);
891
892         /*
893          * pthreadpool_tevent_job_cleanup()
894          * will destroy the job.
895          */
896         tevent_req_defer_callback(req, state->ev);
897         tevent_req_error(req, ECANCELED);
898         return true;
899 }
900
901 int pthreadpool_tevent_job_recv(struct tevent_req *req)
902 {
903         return tevent_req_simple_recv_unix(req);
904 }