pthreadpool: add helgrind magic to PTHREAD_TEVENT_JOB_THREAD_FENCE_*()
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/threads.h"
22 #include "pthreadpool_tevent.h"
23 #include "pthreadpool.h"
24 #include "lib/util/tevent_unix.h"
25 #include "lib/util/dlinklist.h"
26 #include "lib/util/attr.h"
27
28 /*
29  * We try to give some hints to helgrind/drd
30  *
31  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
32  * takes an memory address range that ignored by helgrind/drd
33  * 'description' is just ignored...
34  *
35  *
36  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
37  * just takes a DWORD/(void *) as unique key
38  * for the barrier.
39  */
40 #ifdef HAVE_VALGRIND_HELGRIND_H
41 #include <valgrind/helgrind.h>
42 #endif
43 #ifndef ANNOTATE_BENIGN_RACE_SIZED
44 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
45 #endif
46 #ifndef ANNOTATE_HAPPENS_BEFORE
47 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
48 #endif
49 #ifndef ANNOTATE_HAPPENS_AFTER
50 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
51 #endif
52 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
53 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
54 #endif
55
56 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
57         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
58         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
59                                    sizeof(__j->needs_fence), \
60                                    "race by design, protected by fence"); \
61 } while(0);
62
63 #ifdef WITH_PTHREADPOOL
64 /*
65  * configure checked we have pthread and atomic_thread_fence() available
66  */
67 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
68         atomic_thread_fence(__order); \
69 } while(0)
70 #else
71 /*
72  * we're using lib/pthreadpool/pthreadpool_sync.c ...
73  */
74 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
75 #ifndef HAVE___THREAD
76 #define __thread
77 #endif
78 #endif
79
80 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
81         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
82         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
83         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
84         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
85 } while(0);
86
87 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
88         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
89         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
90 } while(0);
91
92 struct pthreadpool_tevent_job_state;
93
94 /*
95  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
96  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
97  * contexts in a pthreadpool_tevent.
98  */
99 struct pthreadpool_tevent_glue {
100         struct pthreadpool_tevent_glue *prev, *next;
101         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
102         /* Tuple we are keeping track of in this list. */
103         struct tevent_context *ev;
104         struct tevent_threaded_context *tctx;
105         /* Pointer to link object owned by *ev. */
106         struct pthreadpool_tevent_glue_ev_link *ev_link;
107 };
108
109 /*
110  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
111  * tevent context from our list of active event contexts if the event context
112  * is destroyed.
113  * This structure is talloc()'ed from the struct tevent_context *, and is a
114  * back-pointer allowing the related struct pthreadpool_tevent_glue object
115  * to be removed from the struct pthreadpool_tevent glue list if the owning
116  * tevent_context is talloc_free()'ed.
117  */
118 struct pthreadpool_tevent_glue_ev_link {
119         struct pthreadpool_tevent_glue *glue;
120 };
121
122 struct pthreadpool_tevent {
123         struct pthreadpool *pool;
124         struct pthreadpool_tevent_glue *glue_list;
125
126         struct pthreadpool_tevent_job *jobs;
127 };
128
129 struct pthreadpool_tevent_job_state {
130         struct tevent_context *ev;
131         struct tevent_req *req;
132         struct pthreadpool_tevent_job *job;
133 };
134
135 struct pthreadpool_tevent_job {
136         struct pthreadpool_tevent_job *prev, *next;
137
138         struct pthreadpool_tevent *pool;
139         struct pthreadpool_tevent_job_state *state;
140         struct tevent_immediate *im;
141
142         void (*fn)(void *private_data);
143         void *private_data;
144
145         /*
146          * Coordination between threads
147          *
148          * There're only one side writing each element
149          * either the main process or the job thread.
150          *
151          * The coordination is done by a full memory
152          * barrier using atomic_thread_fence(memory_order_seq_cst)
153          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
154          */
155         struct {
156                 /*
157                  * 'maycancel'
158                  * set when tevent_req_cancel() is called.
159                  * (only written by main thread!)
160                  */
161                 bool maycancel;
162
163                 /*
164                  * 'orphaned'
165                  * set when talloc_free is called on the job request,
166                  * tevent_context or pthreadpool_tevent.
167                  * (only written by main thread!)
168                  */
169                 bool orphaned;
170
171                 /*
172                  * 'started'
173                  * set when the job is picked up by a worker thread
174                  * (only written by job thread!)
175                  */
176                 bool started;
177
178                 /*
179                  * 'executed'
180                  * set once the job function returned.
181                  * (only written by job thread!)
182                  */
183                 bool executed;
184
185                 /*
186                  * 'finished'
187                  * set when pthreadpool_tevent_job_signal() is entered
188                  * (only written by job thread!)
189                  */
190                 bool finished;
191
192                 /*
193                  * 'dropped'
194                  * set when pthreadpool_tevent_job_signal() leaves with
195                  * orphaned already set.
196                  * (only written by job thread!)
197                  */
198                 bool dropped;
199
200                 /*
201                  * 'signaled'
202                  * set when pthreadpool_tevent_job_signal() leaves normal
203                  * and the immediate event was scheduled.
204                  * (only written by job thread!)
205                  */
206                 bool signaled;
207         } needs_fence;
208 };
209
210 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
211
212 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
213
214 static struct pthreadpool_tevent_job *orphaned_jobs;
215
216 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
217 {
218         struct pthreadpool_tevent_job *job = NULL;
219         struct pthreadpool_tevent_job *njob = NULL;
220
221         for (job = orphaned_jobs; job != NULL; job = njob) {
222                 njob = job->next;
223
224                 /*
225                  * The job destructor keeps the job alive
226                  * (and in the list) or removes it from the list.
227                  */
228                 TALLOC_FREE(job);
229         }
230 }
231
232 static int pthreadpool_tevent_job_signal(int jobid,
233                                          void (*job_fn)(void *private_data),
234                                          void *job_private_data,
235                                          void *private_data);
236
237 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
238                             struct pthreadpool_tevent **presult)
239 {
240         struct pthreadpool_tevent *pool;
241         int ret;
242
243         pthreadpool_tevent_cleanup_orphaned_jobs();
244
245         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
246         if (pool == NULL) {
247                 return ENOMEM;
248         }
249
250         ret = pthreadpool_init(max_threads, &pool->pool,
251                                pthreadpool_tevent_job_signal, pool);
252         if (ret != 0) {
253                 TALLOC_FREE(pool);
254                 return ret;
255         }
256
257         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
258
259         *presult = pool;
260         return 0;
261 }
262
263 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
264 {
265         if (pool->pool == NULL) {
266                 return 0;
267         }
268
269         return pthreadpool_max_threads(pool->pool);
270 }
271
272 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
273 {
274         if (pool->pool == NULL) {
275                 return 0;
276         }
277
278         return pthreadpool_queued_jobs(pool->pool);
279 }
280
281 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
282 {
283         struct pthreadpool_tevent_job *job = NULL;
284         struct pthreadpool_tevent_job *njob = NULL;
285         struct pthreadpool_tevent_glue *glue = NULL;
286         int ret;
287
288         ret = pthreadpool_stop(pool->pool);
289         if (ret != 0) {
290                 return ret;
291         }
292
293         for (job = pool->jobs; job != NULL; job = njob) {
294                 njob = job->next;
295
296                 /* The job this removes it from the list */
297                 pthreadpool_tevent_job_orphan(job);
298         }
299
300         /*
301          * Delete all the registered
302          * tevent_context/tevent_threaded_context
303          * pairs.
304          */
305         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
306                 /* The glue destructor removes it from the list */
307                 TALLOC_FREE(glue);
308         }
309         pool->glue_list = NULL;
310
311         ret = pthreadpool_destroy(pool->pool);
312         if (ret != 0) {
313                 return ret;
314         }
315         pool->pool = NULL;
316
317         pthreadpool_tevent_cleanup_orphaned_jobs();
318
319         return 0;
320 }
321
322 static int pthreadpool_tevent_glue_destructor(
323         struct pthreadpool_tevent_glue *glue)
324 {
325         if (glue->pool->glue_list != NULL) {
326                 DLIST_REMOVE(glue->pool->glue_list, glue);
327         }
328
329         /* Ensure the ev_link destructor knows we're gone */
330         glue->ev_link->glue = NULL;
331
332         TALLOC_FREE(glue->ev_link);
333         TALLOC_FREE(glue->tctx);
334
335         return 0;
336 }
337
338 /*
339  * Destructor called either explicitly from
340  * pthreadpool_tevent_glue_destructor(), or indirectly
341  * when owning tevent_context is destroyed.
342  *
343  * When called from pthreadpool_tevent_glue_destructor()
344  * ev_link->glue is already NULL, so this does nothing.
345  *
346  * When called from talloc_free() of the owning
347  * tevent_context we must ensure we also remove the
348  * linked glue object from the list inside
349  * struct pthreadpool_tevent.
350  */
351 static int pthreadpool_tevent_glue_link_destructor(
352         struct pthreadpool_tevent_glue_ev_link *ev_link)
353 {
354         TALLOC_FREE(ev_link->glue);
355         return 0;
356 }
357
358 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
359                                           struct tevent_context *ev)
360 {
361         struct pthreadpool_tevent_glue *glue = NULL;
362         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
363
364         /*
365          * See if this tevent_context was already registered by
366          * searching the glue object list. If so we have nothing
367          * to do here - we already have a tevent_context/tevent_threaded_context
368          * pair.
369          */
370         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
371                 if (glue->ev == ev) {
372                         return 0;
373                 }
374         }
375
376         /*
377          * Event context not yet registered - create a new glue
378          * object containing a tevent_context/tevent_threaded_context
379          * pair and put it on the list to remember this registration.
380          * We also need a link object to ensure the event context
381          * can't go away without us knowing about it.
382          */
383         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
384         if (glue == NULL) {
385                 return ENOMEM;
386         }
387         *glue = (struct pthreadpool_tevent_glue) {
388                 .pool = pool,
389                 .ev = ev,
390         };
391         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
392
393         /*
394          * Now allocate the link object to the event context. Note this
395          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
396          * context is freed we are able to cleanup the glue object
397          * in the link object destructor.
398          */
399
400         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
401         if (ev_link == NULL) {
402                 TALLOC_FREE(glue);
403                 return ENOMEM;
404         }
405         ev_link->glue = glue;
406         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
407
408         glue->ev_link = ev_link;
409
410 #ifdef HAVE_PTHREAD
411         glue->tctx = tevent_threaded_context_create(glue, ev);
412         if (glue->tctx == NULL) {
413                 TALLOC_FREE(ev_link);
414                 TALLOC_FREE(glue);
415                 return ENOMEM;
416         }
417 #endif
418
419         DLIST_ADD(pool->glue_list, glue);
420         return 0;
421 }
422
423 static void pthreadpool_tevent_job_fn(void *private_data);
424 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
425                                         struct tevent_immediate *im,
426                                         void *private_data);
427 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
428
429 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
430 {
431         /*
432          * We should never be called with needs_fence.orphaned == false.
433          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
434          * after detaching from the request state and pool list.
435          */
436         if (!job->needs_fence.orphaned) {
437                 abort();
438         }
439
440         /*
441          * If the job is not finished (job->im still there)
442          * and it's still attached to the pool,
443          * we try to cancel it (before it was starts)
444          */
445         if (job->im != NULL && job->pool != NULL) {
446                 size_t num;
447
448                 num = pthreadpool_cancel_job(job->pool->pool, 0,
449                                              pthreadpool_tevent_job_fn,
450                                              job);
451                 if (num != 0) {
452                         /*
453                          * It was not too late to cancel the request.
454                          *
455                          * We can remove job->im, as it will never be used.
456                          */
457                         TALLOC_FREE(job->im);
458                 }
459         }
460
461         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
462         if (job->needs_fence.dropped) {
463                 /*
464                  * The signal function saw job->needs_fence.orphaned
465                  * before it started the signaling via the immediate
466                  * event. So we'll never geht triggered and can
467                  * remove job->im and let the whole job go...
468                  */
469                 TALLOC_FREE(job->im);
470         }
471
472         /*
473          * pthreadpool_tevent_job_orphan() already removed
474          * it from pool->jobs. And we don't need try
475          * pthreadpool_cancel_job() again.
476          */
477         job->pool = NULL;
478
479         if (job->im != NULL) {
480                 /*
481                  * state->im still there means, we need to wait for the
482                  * immediate event to be triggered or just leak the memory.
483                  *
484                  * Move it to the orphaned list, if it's not already there.
485                  */
486                 return -1;
487         }
488
489         /*
490          * Finally remove from the orphaned_jobs list
491          * and let talloc destroy us.
492          */
493         DLIST_REMOVE(orphaned_jobs, job);
494
495         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
496         return 0;
497 }
498
499 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
500 {
501         job->needs_fence.orphaned = true;
502         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
503
504         /*
505          * We're the only function that sets
506          * job->state = NULL;
507          */
508         if (job->state == NULL) {
509                 abort();
510         }
511
512         /*
513          * We need to reparent to a long term context.
514          * And detach from the request state.
515          * Maybe the destructor will keep the memory
516          * and leak it for now.
517          */
518         (void)talloc_reparent(job->state, NULL, job);
519         job->state->job = NULL;
520         job->state = NULL;
521
522         /*
523          * job->pool will only be set to NULL
524          * in the first destructur run.
525          */
526         if (job->pool == NULL) {
527                 abort();
528         }
529
530         /*
531          * Dettach it from the pool.
532          *
533          * The job might still be running,
534          * so we keep job->pool.
535          * The destructor will set it to NULL
536          * after trying pthreadpool_cancel_job()
537          */
538         DLIST_REMOVE(job->pool->jobs, job);
539
540         /*
541          * Add it to the list of orphaned jobs,
542          * which may be cleaned up later.
543          *
544          * The destructor removes it from the list
545          * when possible or it denies the free
546          * and keep it in the list.
547          */
548         DLIST_ADD_END(orphaned_jobs, job);
549         TALLOC_FREE(job);
550 }
551
552 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
553                                            enum tevent_req_state req_state)
554 {
555         struct pthreadpool_tevent_job_state *state =
556                 tevent_req_data(req,
557                 struct pthreadpool_tevent_job_state);
558
559         if (state->job == NULL) {
560                 /*
561                  * The job request is not scheduled in the pool
562                  * yet or anymore.
563                  */
564                 return;
565         }
566
567         /*
568          * We need to reparent to a long term context.
569          * Maybe the destructor will keep the memory
570          * and leak it for now.
571          */
572         pthreadpool_tevent_job_orphan(state->job);
573         state->job = NULL; /* not needed but looks better */
574         return;
575 }
576
577 struct tevent_req *pthreadpool_tevent_job_send(
578         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
579         struct pthreadpool_tevent *pool,
580         void (*fn)(void *private_data), void *private_data)
581 {
582         struct tevent_req *req = NULL;
583         struct pthreadpool_tevent_job_state *state = NULL;
584         struct pthreadpool_tevent_job *job = NULL;
585         int ret;
586
587         pthreadpool_tevent_cleanup_orphaned_jobs();
588
589         req = tevent_req_create(mem_ctx, &state,
590                                 struct pthreadpool_tevent_job_state);
591         if (req == NULL) {
592                 return NULL;
593         }
594         state->ev = ev;
595         state->req = req;
596
597         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
598
599         if (pool == NULL) {
600                 tevent_req_error(req, EINVAL);
601                 return tevent_req_post(req, ev);
602         }
603         if (pool->pool == NULL) {
604                 tevent_req_error(req, EINVAL);
605                 return tevent_req_post(req, ev);
606         }
607
608         ret = pthreadpool_tevent_register_ev(pool, ev);
609         if (tevent_req_error(req, ret)) {
610                 return tevent_req_post(req, ev);
611         }
612
613         job = talloc_zero(state, struct pthreadpool_tevent_job);
614         if (tevent_req_nomem(job, req)) {
615                 return tevent_req_post(req, ev);
616         }
617         job->pool = pool;
618         job->fn = fn;
619         job->private_data = private_data;
620         job->im = tevent_create_immediate(state->job);
621         if (tevent_req_nomem(job->im, req)) {
622                 return tevent_req_post(req, ev);
623         }
624         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
625         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
626         DLIST_ADD_END(job->pool->jobs, job);
627         job->state = state;
628         state->job = job;
629
630         ret = pthreadpool_add_job(job->pool->pool, 0,
631                                   pthreadpool_tevent_job_fn,
632                                   job);
633         if (tevent_req_error(req, ret)) {
634                 return tevent_req_post(req, ev);
635         }
636
637         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
638         return req;
639 }
640
641 static __thread struct pthreadpool_tevent_job *current_job;
642
643 bool pthreadpool_tevent_current_job_canceled(void)
644 {
645         if (current_job == NULL) {
646                 /*
647                  * Should only be called from within
648                  * the job function.
649                  */
650                 abort();
651                 return false;
652         }
653
654         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
655         return current_job->needs_fence.maycancel;
656 }
657
658 bool pthreadpool_tevent_current_job_orphaned(void)
659 {
660         if (current_job == NULL) {
661                 /*
662                  * Should only be called from within
663                  * the job function.
664                  */
665                 abort();
666                 return false;
667         }
668
669         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
670         return current_job->needs_fence.orphaned;
671 }
672
673 bool pthreadpool_tevent_current_job_continue(void)
674 {
675         if (current_job == NULL) {
676                 /*
677                  * Should only be called from within
678                  * the job function.
679                  */
680                 abort();
681                 return false;
682         }
683
684         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
685         if (current_job->needs_fence.maycancel) {
686                 return false;
687         }
688         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
689         if (current_job->needs_fence.orphaned) {
690                 return false;
691         }
692
693         return true;
694 }
695
696 static void pthreadpool_tevent_job_fn(void *private_data)
697 {
698         struct pthreadpool_tevent_job *job =
699                 talloc_get_type_abort(private_data,
700                 struct pthreadpool_tevent_job);
701
702         current_job = job;
703         job->needs_fence.started = true;
704         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
705
706         job->fn(job->private_data);
707
708         job->needs_fence.executed = true;
709         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
710         current_job = NULL;
711 }
712
713 static int pthreadpool_tevent_job_signal(int jobid,
714                                          void (*job_fn)(void *private_data),
715                                          void *job_private_data,
716                                          void *private_data)
717 {
718         struct pthreadpool_tevent_job *job =
719                 talloc_get_type_abort(job_private_data,
720                 struct pthreadpool_tevent_job);
721         struct pthreadpool_tevent_job_state *state = job->state;
722         struct tevent_threaded_context *tctx = NULL;
723         struct pthreadpool_tevent_glue *g = NULL;
724
725         job->needs_fence.finished = true;
726         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
727         if (job->needs_fence.orphaned) {
728                 /* Request already gone */
729                 job->needs_fence.dropped = true;
730                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
731                 return 0;
732         }
733
734 #ifdef HAVE_PTHREAD
735         for (g = job->pool->glue_list; g != NULL; g = g->next) {
736                 if (g->ev == state->ev) {
737                         tctx = g->tctx;
738                         break;
739                 }
740         }
741
742         if (tctx == NULL) {
743                 abort();
744         }
745 #endif
746
747         if (tctx != NULL) {
748                 /* with HAVE_PTHREAD */
749                 tevent_threaded_schedule_immediate(tctx, job->im,
750                                                    pthreadpool_tevent_job_done,
751                                                    job);
752         } else {
753                 /* without HAVE_PTHREAD */
754                 tevent_schedule_immediate(job->im, state->ev,
755                                           pthreadpool_tevent_job_done,
756                                           job);
757         }
758
759         job->needs_fence.signaled = true;
760         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
761         return 0;
762 }
763
764 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
765                                         struct tevent_immediate *im,
766                                         void *private_data)
767 {
768         struct pthreadpool_tevent_job *job =
769                 talloc_get_type_abort(private_data,
770                 struct pthreadpool_tevent_job);
771         struct pthreadpool_tevent_job_state *state = job->state;
772
773         TALLOC_FREE(job->im);
774
775         if (state == NULL) {
776                 /* Request already gone */
777                 TALLOC_FREE(job);
778                 return;
779         }
780
781         /*
782          * pthreadpool_tevent_job_cleanup()
783          * (called by tevent_req_done() or
784          * tevent_req_error()) will destroy the job.
785          */
786
787         if (job->needs_fence.executed) {
788                 tevent_req_done(state->req);
789                 return;
790         }
791
792         tevent_req_error(state->req, ENOEXEC);
793         return;
794 }
795
796 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
797 {
798         struct pthreadpool_tevent_job_state *state =
799                 tevent_req_data(req,
800                 struct pthreadpool_tevent_job_state);
801         struct pthreadpool_tevent_job *job = state->job;
802         size_t num;
803
804         if (job == NULL) {
805                 return false;
806         }
807
808         job->needs_fence.maycancel = true;
809         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
810         if (job->needs_fence.started) {
811                 /*
812                  * It was too late to cancel the request.
813                  *
814                  * The job still has the chance to look
815                  * at pthreadpool_tevent_current_job_canceled()
816                  * or pthreadpool_tevent_current_job_continue()
817                  */
818                 return false;
819         }
820
821         num = pthreadpool_cancel_job(job->pool->pool, 0,
822                                      pthreadpool_tevent_job_fn,
823                                      job);
824         if (num == 0) {
825                 /*
826                  * It was too late to cancel the request.
827                  */
828                 return false;
829         }
830
831         /*
832          * It was not too late to cancel the request.
833          *
834          * We can remove job->im, as it will never be used.
835          */
836         TALLOC_FREE(job->im);
837
838         /*
839          * pthreadpool_tevent_job_cleanup()
840          * will destroy the job.
841          */
842         tevent_req_defer_callback(req, state->ev);
843         tevent_req_error(req, ECANCELED);
844         return true;
845 }
846
847 int pthreadpool_tevent_job_recv(struct tevent_req *req)
848 {
849         return tevent_req_simple_recv_unix(req);
850 }