Revert "pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue"
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/threads.h"
22 #include "system/filesys.h"
23 #include "pthreadpool_tevent.h"
24 #include "pthreadpool.h"
25 #include "lib/util/tevent_unix.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/util/attr.h"
28
29 /*
30  * We try to give some hints to helgrind/drd
31  *
32  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
33  * takes an memory address range that ignored by helgrind/drd
34  * 'description' is just ignored...
35  *
36  *
37  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
38  * just takes a DWORD/(void *) as unique key
39  * for the barrier.
40  */
41 #ifdef HAVE_VALGRIND_HELGRIND_H
42 #include <valgrind/helgrind.h>
43 #endif
44 #ifndef ANNOTATE_BENIGN_RACE_SIZED
45 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
46 #endif
47 #ifndef ANNOTATE_HAPPENS_BEFORE
48 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
49 #endif
50 #ifndef ANNOTATE_HAPPENS_AFTER
51 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
52 #endif
53 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
54 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
55 #endif
56
57 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
58         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
59         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
60                                    sizeof(__j->needs_fence), \
61                                    "race by design, protected by fence"); \
62 } while(0);
63
64 #ifdef WITH_PTHREADPOOL
65 /*
66  * configure checked we have pthread and atomic_thread_fence() available
67  */
68 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
69         atomic_thread_fence(__order); \
70 } while(0)
71 #else
72 /*
73  * we're using lib/pthreadpool/pthreadpool_sync.c ...
74  */
75 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
76 #ifndef HAVE___THREAD
77 #define __thread
78 #endif
79 #endif
80
81 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
82         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
83         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
84         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
85         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
86 } while(0);
87
88 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
89         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
90         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
91 } while(0);
92
93 struct pthreadpool_tevent_job_state;
94
95 /*
96  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
97  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
98  * contexts in a pthreadpool_tevent.
99  */
100 struct pthreadpool_tevent_glue {
101         struct pthreadpool_tevent_glue *prev, *next;
102         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
103         /* Tuple we are keeping track of in this list. */
104         struct tevent_context *ev;
105         struct tevent_threaded_context *tctx;
106         /* Pointer to link object owned by *ev. */
107         struct pthreadpool_tevent_glue_ev_link *ev_link;
108 };
109
110 /*
111  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
112  * tevent context from our list of active event contexts if the event context
113  * is destroyed.
114  * This structure is talloc()'ed from the struct tevent_context *, and is a
115  * back-pointer allowing the related struct pthreadpool_tevent_glue object
116  * to be removed from the struct pthreadpool_tevent glue list if the owning
117  * tevent_context is talloc_free()'ed.
118  */
119 struct pthreadpool_tevent_glue_ev_link {
120         struct pthreadpool_tevent_glue *glue;
121 };
122
123 struct pthreadpool_tevent {
124         struct pthreadpool *pool;
125         struct pthreadpool_tevent_glue *glue_list;
126
127         struct pthreadpool_tevent_job *jobs;
128 };
129
130 struct pthreadpool_tevent_job_state {
131         struct tevent_context *ev;
132         struct tevent_req *req;
133         struct pthreadpool_tevent_job *job;
134 };
135
136 struct pthreadpool_tevent_job {
137         struct pthreadpool_tevent_job *prev, *next;
138
139         struct pthreadpool_tevent *pool;
140         struct pthreadpool_tevent_job_state *state;
141         struct tevent_immediate *im;
142
143         void (*fn)(void *private_data);
144         void *private_data;
145
146         /*
147          * Coordination between threads
148          *
149          * There're only one side writing each element
150          * either the main process or the job thread.
151          *
152          * The coordination is done by a full memory
153          * barrier using atomic_thread_fence(memory_order_seq_cst)
154          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
155          */
156         struct {
157                 /*
158                  * 'maycancel'
159                  * set when tevent_req_cancel() is called.
160                  * (only written by main thread!)
161                  */
162                 bool maycancel;
163
164                 /*
165                  * 'orphaned'
166                  * set when talloc_free is called on the job request,
167                  * tevent_context or pthreadpool_tevent.
168                  * (only written by main thread!)
169                  */
170                 bool orphaned;
171
172                 /*
173                  * 'started'
174                  * set when the job is picked up by a worker thread
175                  * (only written by job thread!)
176                  */
177                 bool started;
178
179                 /*
180                  * 'executed'
181                  * set once the job function returned.
182                  * (only written by job thread!)
183                  */
184                 bool executed;
185
186                 /*
187                  * 'finished'
188                  * set when pthreadpool_tevent_job_signal() is entered
189                  * (only written by job thread!)
190                  */
191                 bool finished;
192
193                 /*
194                  * 'dropped'
195                  * set when pthreadpool_tevent_job_signal() leaves with
196                  * orphaned already set.
197                  * (only written by job thread!)
198                  */
199                 bool dropped;
200
201                 /*
202                  * 'signaled'
203                  * set when pthreadpool_tevent_job_signal() leaves normal
204                  * and the immediate event was scheduled.
205                  * (only written by job thread!)
206                  */
207                 bool signaled;
208         } needs_fence;
209 };
210
211 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
212
213 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
214
215 static struct pthreadpool_tevent_job *orphaned_jobs;
216
217 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
218 {
219         struct pthreadpool_tevent_job *job = NULL;
220         struct pthreadpool_tevent_job *njob = NULL;
221
222         for (job = orphaned_jobs; job != NULL; job = njob) {
223                 njob = job->next;
224
225                 /*
226                  * The job destructor keeps the job alive
227                  * (and in the list) or removes it from the list.
228                  */
229                 TALLOC_FREE(job);
230         }
231 }
232
233 static int pthreadpool_tevent_job_signal(int jobid,
234                                          void (*job_fn)(void *private_data),
235                                          void *job_private_data,
236                                          void *private_data);
237
238 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
239                             struct pthreadpool_tevent **presult)
240 {
241         struct pthreadpool_tevent *pool;
242         int ret;
243
244         pthreadpool_tevent_cleanup_orphaned_jobs();
245
246         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
247         if (pool == NULL) {
248                 return ENOMEM;
249         }
250
251         ret = pthreadpool_init(max_threads, &pool->pool,
252                                pthreadpool_tevent_job_signal, pool);
253         if (ret != 0) {
254                 TALLOC_FREE(pool);
255                 return ret;
256         }
257
258         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
259
260         *presult = pool;
261         return 0;
262 }
263
264 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
265 {
266         if (pool->pool == NULL) {
267                 return 0;
268         }
269
270         return pthreadpool_max_threads(pool->pool);
271 }
272
273 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
274 {
275         if (pool->pool == NULL) {
276                 return 0;
277         }
278
279         return pthreadpool_queued_jobs(pool->pool);
280 }
281
282 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
283 {
284         struct pthreadpool_tevent_job *job = NULL;
285         struct pthreadpool_tevent_job *njob = NULL;
286         struct pthreadpool_tevent_glue *glue = NULL;
287         int ret;
288
289         ret = pthreadpool_stop(pool->pool);
290         if (ret != 0) {
291                 return ret;
292         }
293
294         for (job = pool->jobs; job != NULL; job = njob) {
295                 njob = job->next;
296
297                 /* The job this removes it from the list */
298                 pthreadpool_tevent_job_orphan(job);
299         }
300
301         /*
302          * Delete all the registered
303          * tevent_context/tevent_threaded_context
304          * pairs.
305          */
306         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
307                 /* The glue destructor removes it from the list */
308                 TALLOC_FREE(glue);
309         }
310         pool->glue_list = NULL;
311
312         ret = pthreadpool_destroy(pool->pool);
313         if (ret != 0) {
314                 return ret;
315         }
316         pool->pool = NULL;
317
318         pthreadpool_tevent_cleanup_orphaned_jobs();
319
320         return 0;
321 }
322
323 static int pthreadpool_tevent_glue_destructor(
324         struct pthreadpool_tevent_glue *glue)
325 {
326         if (glue->pool->glue_list != NULL) {
327                 DLIST_REMOVE(glue->pool->glue_list, glue);
328         }
329
330         /* Ensure the ev_link destructor knows we're gone */
331         glue->ev_link->glue = NULL;
332
333         TALLOC_FREE(glue->ev_link);
334         TALLOC_FREE(glue->tctx);
335
336         return 0;
337 }
338
339 /*
340  * Destructor called either explicitly from
341  * pthreadpool_tevent_glue_destructor(), or indirectly
342  * when owning tevent_context is destroyed.
343  *
344  * When called from pthreadpool_tevent_glue_destructor()
345  * ev_link->glue is already NULL, so this does nothing.
346  *
347  * When called from talloc_free() of the owning
348  * tevent_context we must ensure we also remove the
349  * linked glue object from the list inside
350  * struct pthreadpool_tevent.
351  */
352 static int pthreadpool_tevent_glue_link_destructor(
353         struct pthreadpool_tevent_glue_ev_link *ev_link)
354 {
355         TALLOC_FREE(ev_link->glue);
356         return 0;
357 }
358
359 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
360                                           struct tevent_context *ev)
361 {
362         struct pthreadpool_tevent_glue *glue = NULL;
363         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
364
365         /*
366          * See if this tevent_context was already registered by
367          * searching the glue object list. If so we have nothing
368          * to do here - we already have a tevent_context/tevent_threaded_context
369          * pair.
370          */
371         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
372                 if (glue->ev == ev) {
373                         return 0;
374                 }
375         }
376
377         /*
378          * Event context not yet registered - create a new glue
379          * object containing a tevent_context/tevent_threaded_context
380          * pair and put it on the list to remember this registration.
381          * We also need a link object to ensure the event context
382          * can't go away without us knowing about it.
383          */
384         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
385         if (glue == NULL) {
386                 return ENOMEM;
387         }
388         *glue = (struct pthreadpool_tevent_glue) {
389                 .pool = pool,
390                 .ev = ev,
391         };
392         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
393
394         /*
395          * Now allocate the link object to the event context. Note this
396          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
397          * context is freed we are able to cleanup the glue object
398          * in the link object destructor.
399          */
400
401         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
402         if (ev_link == NULL) {
403                 TALLOC_FREE(glue);
404                 return ENOMEM;
405         }
406         ev_link->glue = glue;
407         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
408
409         glue->ev_link = ev_link;
410
411 #ifdef HAVE_PTHREAD
412         glue->tctx = tevent_threaded_context_create(glue, ev);
413         if (glue->tctx == NULL) {
414                 TALLOC_FREE(ev_link);
415                 TALLOC_FREE(glue);
416                 return ENOMEM;
417         }
418 #endif
419
420         DLIST_ADD(pool->glue_list, glue);
421         return 0;
422 }
423
424 static void pthreadpool_tevent_job_fn(void *private_data);
425 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
426                                         struct tevent_immediate *im,
427                                         void *private_data);
428 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
429
430 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
431 {
432         /*
433          * We should never be called with needs_fence.orphaned == false.
434          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
435          * after detaching from the request state and pool list.
436          */
437         if (!job->needs_fence.orphaned) {
438                 abort();
439         }
440
441         /*
442          * If the job is not finished (job->im still there)
443          * and it's still attached to the pool,
444          * we try to cancel it (before it was starts)
445          */
446         if (job->im != NULL && job->pool != NULL) {
447                 size_t num;
448
449                 num = pthreadpool_cancel_job(job->pool->pool, 0,
450                                              pthreadpool_tevent_job_fn,
451                                              job);
452                 if (num != 0) {
453                         /*
454                          * It was not too late to cancel the request.
455                          *
456                          * We can remove job->im, as it will never be used.
457                          */
458                         TALLOC_FREE(job->im);
459                 }
460         }
461
462         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
463         if (job->needs_fence.dropped) {
464                 /*
465                  * The signal function saw job->needs_fence.orphaned
466                  * before it started the signaling via the immediate
467                  * event. So we'll never geht triggered and can
468                  * remove job->im and let the whole job go...
469                  */
470                 TALLOC_FREE(job->im);
471         }
472
473         /*
474          * pthreadpool_tevent_job_orphan() already removed
475          * it from pool->jobs. And we don't need try
476          * pthreadpool_cancel_job() again.
477          */
478         job->pool = NULL;
479
480         if (job->im != NULL) {
481                 /*
482                  * state->im still there means, we need to wait for the
483                  * immediate event to be triggered or just leak the memory.
484                  *
485                  * Move it to the orphaned list, if it's not already there.
486                  */
487                 return -1;
488         }
489
490         /*
491          * Finally remove from the orphaned_jobs list
492          * and let talloc destroy us.
493          */
494         DLIST_REMOVE(orphaned_jobs, job);
495
496         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
497         return 0;
498 }
499
500 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
501 {
502         job->needs_fence.orphaned = true;
503         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
504
505         /*
506          * We're the only function that sets
507          * job->state = NULL;
508          */
509         if (job->state == NULL) {
510                 abort();
511         }
512
513         /*
514          * We need to reparent to a long term context.
515          * And detach from the request state.
516          * Maybe the destructor will keep the memory
517          * and leak it for now.
518          */
519         (void)talloc_reparent(job->state, NULL, job);
520         job->state->job = NULL;
521         job->state = NULL;
522
523         /*
524          * job->pool will only be set to NULL
525          * in the first destructur run.
526          */
527         if (job->pool == NULL) {
528                 abort();
529         }
530
531         /*
532          * Dettach it from the pool.
533          *
534          * The job might still be running,
535          * so we keep job->pool.
536          * The destructor will set it to NULL
537          * after trying pthreadpool_cancel_job()
538          */
539         DLIST_REMOVE(job->pool->jobs, job);
540
541         /*
542          * Add it to the list of orphaned jobs,
543          * which may be cleaned up later.
544          *
545          * The destructor removes it from the list
546          * when possible or it denies the free
547          * and keep it in the list.
548          */
549         DLIST_ADD_END(orphaned_jobs, job);
550         TALLOC_FREE(job);
551 }
552
553 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
554                                            enum tevent_req_state req_state)
555 {
556         struct pthreadpool_tevent_job_state *state =
557                 tevent_req_data(req,
558                 struct pthreadpool_tevent_job_state);
559
560         if (state->job == NULL) {
561                 /*
562                  * The job request is not scheduled in the pool
563                  * yet or anymore.
564                  */
565                 return;
566         }
567
568         /*
569          * We need to reparent to a long term context.
570          * Maybe the destructor will keep the memory
571          * and leak it for now.
572          */
573         pthreadpool_tevent_job_orphan(state->job);
574         state->job = NULL; /* not needed but looks better */
575         return;
576 }
577
578 struct tevent_req *pthreadpool_tevent_job_send(
579         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
580         struct pthreadpool_tevent *pool,
581         void (*fn)(void *private_data), void *private_data)
582 {
583         struct tevent_req *req = NULL;
584         struct pthreadpool_tevent_job_state *state = NULL;
585         struct pthreadpool_tevent_job *job = NULL;
586         int ret;
587
588         pthreadpool_tevent_cleanup_orphaned_jobs();
589
590         req = tevent_req_create(mem_ctx, &state,
591                                 struct pthreadpool_tevent_job_state);
592         if (req == NULL) {
593                 return NULL;
594         }
595         state->ev = ev;
596         state->req = req;
597
598         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
599
600         if (pool == NULL) {
601                 tevent_req_error(req, EINVAL);
602                 return tevent_req_post(req, ev);
603         }
604         if (pool->pool == NULL) {
605                 tevent_req_error(req, EINVAL);
606                 return tevent_req_post(req, ev);
607         }
608
609         ret = pthreadpool_tevent_register_ev(pool, ev);
610         if (tevent_req_error(req, ret)) {
611                 return tevent_req_post(req, ev);
612         }
613
614         job = talloc_zero(state, struct pthreadpool_tevent_job);
615         if (tevent_req_nomem(job, req)) {
616                 return tevent_req_post(req, ev);
617         }
618         job->pool = pool;
619         job->fn = fn;
620         job->private_data = private_data;
621         job->im = tevent_create_immediate(state->job);
622         if (tevent_req_nomem(job->im, req)) {
623                 return tevent_req_post(req, ev);
624         }
625         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
626         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
627         DLIST_ADD_END(job->pool->jobs, job);
628         job->state = state;
629         state->job = job;
630
631         ret = pthreadpool_add_job(job->pool->pool, 0,
632                                   pthreadpool_tevent_job_fn,
633                                   job);
634         if (tevent_req_error(req, ret)) {
635                 return tevent_req_post(req, ev);
636         }
637
638         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
639         return req;
640 }
641
642 static __thread struct pthreadpool_tevent_job *current_job;
643
644 bool pthreadpool_tevent_current_job_canceled(void)
645 {
646         if (current_job == NULL) {
647                 /*
648                  * Should only be called from within
649                  * the job function.
650                  */
651                 abort();
652                 return false;
653         }
654
655         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
656         return current_job->needs_fence.maycancel;
657 }
658
659 bool pthreadpool_tevent_current_job_orphaned(void)
660 {
661         if (current_job == NULL) {
662                 /*
663                  * Should only be called from within
664                  * the job function.
665                  */
666                 abort();
667                 return false;
668         }
669
670         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
671         return current_job->needs_fence.orphaned;
672 }
673
674 bool pthreadpool_tevent_current_job_continue(void)
675 {
676         if (current_job == NULL) {
677                 /*
678                  * Should only be called from within
679                  * the job function.
680                  */
681                 abort();
682                 return false;
683         }
684
685         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
686         if (current_job->needs_fence.maycancel) {
687                 return false;
688         }
689         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
690         if (current_job->needs_fence.orphaned) {
691                 return false;
692         }
693
694         return true;
695 }
696
697 static void pthreadpool_tevent_job_fn(void *private_data)
698 {
699         struct pthreadpool_tevent_job *job =
700                 talloc_get_type_abort(private_data,
701                 struct pthreadpool_tevent_job);
702
703         current_job = job;
704         job->needs_fence.started = true;
705         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
706
707         job->fn(job->private_data);
708
709         job->needs_fence.executed = true;
710         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
711         current_job = NULL;
712 }
713
714 static int pthreadpool_tevent_job_signal(int jobid,
715                                          void (*job_fn)(void *private_data),
716                                          void *job_private_data,
717                                          void *private_data)
718 {
719         struct pthreadpool_tevent_job *job =
720                 talloc_get_type_abort(job_private_data,
721                 struct pthreadpool_tevent_job);
722         struct pthreadpool_tevent_job_state *state = job->state;
723         struct tevent_threaded_context *tctx = NULL;
724         struct pthreadpool_tevent_glue *g = NULL;
725
726         job->needs_fence.finished = true;
727         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
728         if (job->needs_fence.orphaned) {
729                 /* Request already gone */
730                 job->needs_fence.dropped = true;
731                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
732                 return 0;
733         }
734
735 #ifdef HAVE_PTHREAD
736         for (g = job->pool->glue_list; g != NULL; g = g->next) {
737                 if (g->ev == state->ev) {
738                         tctx = g->tctx;
739                         break;
740                 }
741         }
742
743         if (tctx == NULL) {
744                 abort();
745         }
746 #endif
747
748         if (tctx != NULL) {
749                 /* with HAVE_PTHREAD */
750                 tevent_threaded_schedule_immediate(tctx, job->im,
751                                                    pthreadpool_tevent_job_done,
752                                                    job);
753         } else {
754                 /* without HAVE_PTHREAD */
755                 tevent_schedule_immediate(job->im, state->ev,
756                                           pthreadpool_tevent_job_done,
757                                           job);
758         }
759
760         job->needs_fence.signaled = true;
761         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
762         return 0;
763 }
764
765 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
766                                         struct tevent_immediate *im,
767                                         void *private_data)
768 {
769         struct pthreadpool_tevent_job *job =
770                 talloc_get_type_abort(private_data,
771                 struct pthreadpool_tevent_job);
772         struct pthreadpool_tevent_job_state *state = job->state;
773
774         TALLOC_FREE(job->im);
775
776         if (state == NULL) {
777                 /* Request already gone */
778                 TALLOC_FREE(job);
779                 return;
780         }
781
782         /*
783          * pthreadpool_tevent_job_cleanup()
784          * (called by tevent_req_done() or
785          * tevent_req_error()) will destroy the job.
786          */
787
788         if (job->needs_fence.executed) {
789                 tevent_req_done(state->req);
790                 return;
791         }
792
793         tevent_req_error(state->req, ENOEXEC);
794         return;
795 }
796
797 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
798 {
799         struct pthreadpool_tevent_job_state *state =
800                 tevent_req_data(req,
801                 struct pthreadpool_tevent_job_state);
802         struct pthreadpool_tevent_job *job = state->job;
803         size_t num;
804
805         if (job == NULL) {
806                 return false;
807         }
808
809         job->needs_fence.maycancel = true;
810         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
811         if (job->needs_fence.started) {
812                 /*
813                  * It was too late to cancel the request.
814                  *
815                  * The job still has the chance to look
816                  * at pthreadpool_tevent_current_job_canceled()
817                  * or pthreadpool_tevent_current_job_continue()
818                  */
819                 return false;
820         }
821
822         num = pthreadpool_cancel_job(job->pool->pool, 0,
823                                      pthreadpool_tevent_job_fn,
824                                      job);
825         if (num == 0) {
826                 /*
827                  * It was too late to cancel the request.
828                  */
829                 return false;
830         }
831
832         /*
833          * It was not too late to cancel the request.
834          *
835          * We can remove job->im, as it will never be used.
836          */
837         TALLOC_FREE(job->im);
838
839         /*
840          * pthreadpool_tevent_job_cleanup()
841          * will destroy the job.
842          */
843         tevent_req_defer_callback(req, state->ev);
844         tevent_req_error(req, ECANCELED);
845         return true;
846 }
847
848 int pthreadpool_tevent_job_recv(struct tevent_req *req)
849 {
850         return tevent_req_simple_recv_unix(req);
851 }