Revert "pthreadpool: add pthreadpool_tevent_[current_job_]per_thread_cwd()"
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "system/filesys.h"
24 #include "pthreadpool_tevent.h"
25 #include "pthreadpool.h"
26 #include "lib/util/tevent_unix.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/util/attr.h"
29
30 /*
31  * We try to give some hints to helgrind/drd
32  *
33  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
34  * takes an memory address range that ignored by helgrind/drd
35  * 'description' is just ignored...
36  *
37  *
38  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
39  * just takes a DWORD/(void *) as unique key
40  * for the barrier.
41  */
42 #ifdef HAVE_VALGRIND_HELGRIND_H
43 #include <valgrind/helgrind.h>
44 #endif
45 #ifndef ANNOTATE_BENIGN_RACE_SIZED
46 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
47 #endif
48 #ifndef ANNOTATE_HAPPENS_BEFORE
49 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
50 #endif
51 #ifndef ANNOTATE_HAPPENS_AFTER
52 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
53 #endif
54 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
55 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
56 #endif
57
58 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
59         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
60         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
61                                    sizeof(__j->needs_fence), \
62                                    "race by design, protected by fence"); \
63 } while(0);
64
65 #ifdef WITH_PTHREADPOOL
66 /*
67  * configure checked we have pthread and atomic_thread_fence() available
68  */
69 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
70         atomic_thread_fence(__order); \
71 } while(0)
72 #else
73 /*
74  * we're using lib/pthreadpool/pthreadpool_sync.c ...
75  */
76 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
77 #ifndef HAVE___THREAD
78 #define __thread
79 #endif
80 #endif
81
82 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
83         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
84         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
85         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
86         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
87 } while(0);
88
89 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
90         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
91         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
92 } while(0);
93
94 struct pthreadpool_tevent_job_state;
95
96 /*
97  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
98  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
99  * contexts in a pthreadpool_tevent.
100  */
101 struct pthreadpool_tevent_glue {
102         struct pthreadpool_tevent_glue *prev, *next;
103         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
104         /* Tuple we are keeping track of in this list. */
105         struct tevent_context *ev;
106         struct tevent_threaded_context *tctx;
107         /* Pointer to link object owned by *ev. */
108         struct pthreadpool_tevent_glue_ev_link *ev_link;
109         /* active jobs */
110         struct pthreadpool_tevent_job_state *states;
111 };
112
113 /*
114  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
115  * tevent context from our list of active event contexts if the event context
116  * is destroyed.
117  * This structure is talloc()'ed from the struct tevent_context *, and is a
118  * back-pointer allowing the related struct pthreadpool_tevent_glue object
119  * to be removed from the struct pthreadpool_tevent glue list if the owning
120  * tevent_context is talloc_free()'ed.
121  */
122 struct pthreadpool_tevent_glue_ev_link {
123         struct pthreadpool_tevent_glue *glue;
124 };
125
126 struct pthreadpool_tevent {
127         struct pthreadpool *pool;
128         struct pthreadpool_tevent_glue *glue_list;
129
130         struct pthreadpool_tevent_job *jobs;
131 };
132
133 struct pthreadpool_tevent_job_state {
134         struct pthreadpool_tevent_job_state *prev, *next;
135         struct pthreadpool_tevent_glue *glue;
136         struct tevent_context *ev;
137         struct tevent_req *req;
138         struct pthreadpool_tevent_job *job;
139 };
140
141 struct pthreadpool_tevent_job {
142         struct pthreadpool_tevent_job *prev, *next;
143
144         struct pthreadpool_tevent *pool;
145         struct pthreadpool_tevent_job_state *state;
146         struct tevent_immediate *im;
147
148         void (*fn)(void *private_data);
149         void *private_data;
150
151         /*
152          * Coordination between threads
153          *
154          * There're only one side writing each element
155          * either the main process or the job thread.
156          *
157          * The coordination is done by a full memory
158          * barrier using atomic_thread_fence(memory_order_seq_cst)
159          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
160          */
161         struct {
162                 /*
163                  * 'maycancel'
164                  * set when tevent_req_cancel() is called.
165                  * (only written by main thread!)
166                  */
167                 bool maycancel;
168
169                 /*
170                  * 'orphaned'
171                  * set when talloc_free is called on the job request,
172                  * tevent_context or pthreadpool_tevent.
173                  * (only written by main thread!)
174                  */
175                 bool orphaned;
176
177                 /*
178                  * 'started'
179                  * set when the job is picked up by a worker thread
180                  * (only written by job thread!)
181                  */
182                 bool started;
183
184                 /*
185                  * 'executed'
186                  * set once the job function returned.
187                  * (only written by job thread!)
188                  */
189                 bool executed;
190
191                 /*
192                  * 'finished'
193                  * set when pthreadpool_tevent_job_signal() is entered
194                  * (only written by job thread!)
195                  */
196                 bool finished;
197
198                 /*
199                  * 'dropped'
200                  * set when pthreadpool_tevent_job_signal() leaves with
201                  * orphaned already set.
202                  * (only written by job thread!)
203                  */
204                 bool dropped;
205
206                 /*
207                  * 'signaled'
208                  * set when pthreadpool_tevent_job_signal() leaves normal
209                  * and the immediate event was scheduled.
210                  * (only written by job thread!)
211                  */
212                 bool signaled;
213         } needs_fence;
214 };
215
216 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
217
218 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
219
220 static struct pthreadpool_tevent_job *orphaned_jobs;
221
222 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
223 {
224         struct pthreadpool_tevent_job *job = NULL;
225         struct pthreadpool_tevent_job *njob = NULL;
226
227         for (job = orphaned_jobs; job != NULL; job = njob) {
228                 njob = job->next;
229
230                 /*
231                  * The job destructor keeps the job alive
232                  * (and in the list) or removes it from the list.
233                  */
234                 TALLOC_FREE(job);
235         }
236 }
237
238 static int pthreadpool_tevent_job_signal(int jobid,
239                                          void (*job_fn)(void *private_data),
240                                          void *job_private_data,
241                                          void *private_data);
242
243 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
244                             struct pthreadpool_tevent **presult)
245 {
246         struct pthreadpool_tevent *pool;
247         int ret;
248
249         pthreadpool_tevent_cleanup_orphaned_jobs();
250
251         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
252         if (pool == NULL) {
253                 return ENOMEM;
254         }
255
256         ret = pthreadpool_init(max_threads, &pool->pool,
257                                pthreadpool_tevent_job_signal, pool);
258         if (ret != 0) {
259                 TALLOC_FREE(pool);
260                 return ret;
261         }
262
263         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
264
265         *presult = pool;
266         return 0;
267 }
268
269 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
270 {
271         if (pool->pool == NULL) {
272                 return 0;
273         }
274
275         return pthreadpool_max_threads(pool->pool);
276 }
277
278 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
279 {
280         if (pool->pool == NULL) {
281                 return 0;
282         }
283
284         return pthreadpool_queued_jobs(pool->pool);
285 }
286
287 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
288 {
289         struct pthreadpool_tevent_job *job = NULL;
290         struct pthreadpool_tevent_job *njob = NULL;
291         struct pthreadpool_tevent_glue *glue = NULL;
292         int ret;
293
294         ret = pthreadpool_stop(pool->pool);
295         if (ret != 0) {
296                 return ret;
297         }
298
299         for (job = pool->jobs; job != NULL; job = njob) {
300                 njob = job->next;
301
302                 /* The job this removes it from the list */
303                 pthreadpool_tevent_job_orphan(job);
304         }
305
306         /*
307          * Delete all the registered
308          * tevent_context/tevent_threaded_context
309          * pairs.
310          */
311         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
312                 /* The glue destructor removes it from the list */
313                 TALLOC_FREE(glue);
314         }
315         pool->glue_list = NULL;
316
317         ret = pthreadpool_destroy(pool->pool);
318         if (ret != 0) {
319                 return ret;
320         }
321         pool->pool = NULL;
322
323         pthreadpool_tevent_cleanup_orphaned_jobs();
324
325         return 0;
326 }
327
328 static int pthreadpool_tevent_glue_destructor(
329         struct pthreadpool_tevent_glue *glue)
330 {
331         struct pthreadpool_tevent_job_state *state = NULL;
332         struct pthreadpool_tevent_job_state *nstate = NULL;
333
334         for (state = glue->states; state != NULL; state = nstate) {
335                 nstate = state->next;
336
337                 /* The job this removes it from the list */
338                 pthreadpool_tevent_job_orphan(state->job);
339         }
340
341         if (glue->pool->glue_list != NULL) {
342                 DLIST_REMOVE(glue->pool->glue_list, glue);
343         }
344
345         /* Ensure the ev_link destructor knows we're gone */
346         glue->ev_link->glue = NULL;
347
348         TALLOC_FREE(glue->ev_link);
349         TALLOC_FREE(glue->tctx);
350
351         return 0;
352 }
353
354 /*
355  * Destructor called either explicitly from
356  * pthreadpool_tevent_glue_destructor(), or indirectly
357  * when owning tevent_context is destroyed.
358  *
359  * When called from pthreadpool_tevent_glue_destructor()
360  * ev_link->glue is already NULL, so this does nothing.
361  *
362  * When called from talloc_free() of the owning
363  * tevent_context we must ensure we also remove the
364  * linked glue object from the list inside
365  * struct pthreadpool_tevent.
366  */
367 static int pthreadpool_tevent_glue_link_destructor(
368         struct pthreadpool_tevent_glue_ev_link *ev_link)
369 {
370         TALLOC_FREE(ev_link->glue);
371         return 0;
372 }
373
374 static int pthreadpool_tevent_register_ev(
375                                 struct pthreadpool_tevent *pool,
376                                 struct pthreadpool_tevent_job_state *state)
377 {
378         struct tevent_context *ev = state->ev;
379         struct pthreadpool_tevent_glue *glue = NULL;
380         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
381
382         /*
383          * See if this tevent_context was already registered by
384          * searching the glue object list. If so we have nothing
385          * to do here - we already have a tevent_context/tevent_threaded_context
386          * pair.
387          */
388         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
389                 if (glue->ev == state->ev) {
390                         state->glue = glue;
391                         DLIST_ADD_END(glue->states, state);
392                         return 0;
393                 }
394         }
395
396         /*
397          * Event context not yet registered - create a new glue
398          * object containing a tevent_context/tevent_threaded_context
399          * pair and put it on the list to remember this registration.
400          * We also need a link object to ensure the event context
401          * can't go away without us knowing about it.
402          */
403         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
404         if (glue == NULL) {
405                 return ENOMEM;
406         }
407         *glue = (struct pthreadpool_tevent_glue) {
408                 .pool = pool,
409                 .ev = ev,
410         };
411         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
412
413         /*
414          * Now allocate the link object to the event context. Note this
415          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
416          * context is freed we are able to cleanup the glue object
417          * in the link object destructor.
418          */
419
420         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
421         if (ev_link == NULL) {
422                 TALLOC_FREE(glue);
423                 return ENOMEM;
424         }
425         ev_link->glue = glue;
426         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
427
428         glue->ev_link = ev_link;
429
430 #ifdef HAVE_PTHREAD
431         glue->tctx = tevent_threaded_context_create(glue, ev);
432         if (glue->tctx == NULL) {
433                 TALLOC_FREE(ev_link);
434                 TALLOC_FREE(glue);
435                 return ENOMEM;
436         }
437 #endif
438
439         state->glue = glue;
440         DLIST_ADD_END(glue->states, state);
441
442         DLIST_ADD(pool->glue_list, glue);
443         return 0;
444 }
445
446 static void pthreadpool_tevent_job_fn(void *private_data);
447 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
448                                         struct tevent_immediate *im,
449                                         void *private_data);
450 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
451
452 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
453 {
454         /*
455          * We should never be called with needs_fence.orphaned == false.
456          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
457          * after detaching from the request state, glue and pool list.
458          */
459         if (!job->needs_fence.orphaned) {
460                 abort();
461         }
462
463         /*
464          * If the job is not finished (job->im still there)
465          * and it's still attached to the pool,
466          * we try to cancel it (before it was starts)
467          */
468         if (job->im != NULL && job->pool != NULL) {
469                 size_t num;
470
471                 num = pthreadpool_cancel_job(job->pool->pool, 0,
472                                              pthreadpool_tevent_job_fn,
473                                              job);
474                 if (num != 0) {
475                         /*
476                          * It was not too late to cancel the request.
477                          *
478                          * We can remove job->im, as it will never be used.
479                          */
480                         TALLOC_FREE(job->im);
481                 }
482         }
483
484         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
485         if (job->needs_fence.dropped) {
486                 /*
487                  * The signal function saw job->needs_fence.orphaned
488                  * before it started the signaling via the immediate
489                  * event. So we'll never geht triggered and can
490                  * remove job->im and let the whole job go...
491                  */
492                 TALLOC_FREE(job->im);
493         }
494
495         /*
496          * TODO?: We could further improve this by adjusting
497          * tevent_threaded_schedule_immediate_destructor()
498          * and allow TALLOC_FREE() during its time
499          * in the main_ev->scheduled_immediates list.
500          *
501          * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
502          * if (state->needs_fence.signaled) {
503          *       *
504          *       * The signal function is completed
505          *       * in future we may be allowed
506          *       * to call TALLOC_FREE(job->im).
507          *       *
508          *      TALLOC_FREE(job->im);
509          * }
510          */
511
512         /*
513          * pthreadpool_tevent_job_orphan() already removed
514          * it from pool->jobs. And we don't need try
515          * pthreadpool_cancel_job() again.
516          */
517         job->pool = NULL;
518
519         if (job->im != NULL) {
520                 /*
521                  * state->im still there means, we need to wait for the
522                  * immediate event to be triggered or just leak the memory.
523                  *
524                  * Move it to the orphaned list, if it's not already there.
525                  */
526                 return -1;
527         }
528
529         /*
530          * Finally remove from the orphaned_jobs list
531          * and let talloc destroy us.
532          */
533         DLIST_REMOVE(orphaned_jobs, job);
534
535         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
536         return 0;
537 }
538
539 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
540 {
541         job->needs_fence.orphaned = true;
542         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
543
544         /*
545          * We're the only function that sets
546          * job->state = NULL;
547          */
548         if (job->state == NULL) {
549                 abort();
550         }
551
552         /*
553          * Once we marked the request as 'orphaned'
554          * we spin/loop if it's already marked
555          * as 'finished' (which means that
556          * pthreadpool_tevent_job_signal() was entered.
557          * If it saw 'orphaned' it will exit after setting
558          * 'dropped', otherwise it dereferences
559          * job->state->glue->{tctx,ev} until it exited
560          * after setting 'signaled'.
561          *
562          * We need to close this potential gab before
563          * we can set job->state = NULL.
564          *
565          * This is some kind of spinlock, but with
566          * 1 millisecond sleeps in between, in order
567          * to give the thread more cpu time to finish.
568          */
569         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
570         while (job->needs_fence.finished) {
571                 if (job->needs_fence.dropped) {
572                         break;
573                 }
574                 if (job->needs_fence.signaled) {
575                         break;
576                 }
577                 poll(NULL, 0, 1);
578                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
579         }
580
581         /*
582          * Once the gab is closed, we can remove
583          * the glue link.
584          */
585         DLIST_REMOVE(job->state->glue->states, job->state);
586         job->state->glue = NULL;
587
588         /*
589          * We need to reparent to a long term context.
590          * And detach from the request state.
591          * Maybe the destructor will keep the memory
592          * and leak it for now.
593          */
594         (void)talloc_reparent(job->state, NULL, job);
595         job->state->job = NULL;
596         job->state = NULL;
597
598         /*
599          * job->pool will only be set to NULL
600          * in the first destructur run.
601          */
602         if (job->pool == NULL) {
603                 abort();
604         }
605
606         /*
607          * Dettach it from the pool.
608          *
609          * The job might still be running,
610          * so we keep job->pool.
611          * The destructor will set it to NULL
612          * after trying pthreadpool_cancel_job()
613          */
614         DLIST_REMOVE(job->pool->jobs, job);
615
616         /*
617          * Add it to the list of orphaned jobs,
618          * which may be cleaned up later.
619          *
620          * The destructor removes it from the list
621          * when possible or it denies the free
622          * and keep it in the list.
623          */
624         DLIST_ADD_END(orphaned_jobs, job);
625         TALLOC_FREE(job);
626 }
627
628 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
629                                            enum tevent_req_state req_state)
630 {
631         struct pthreadpool_tevent_job_state *state =
632                 tevent_req_data(req,
633                 struct pthreadpool_tevent_job_state);
634
635         if (state->job == NULL) {
636                 /*
637                  * The job request is not scheduled in the pool
638                  * yet or anymore.
639                  */
640                 if (state->glue != NULL) {
641                         DLIST_REMOVE(state->glue->states, state);
642                         state->glue = NULL;
643                 }
644                 return;
645         }
646
647         /*
648          * We need to reparent to a long term context.
649          * Maybe the destructor will keep the memory
650          * and leak it for now.
651          */
652         pthreadpool_tevent_job_orphan(state->job);
653         state->job = NULL; /* not needed but looks better */
654         return;
655 }
656
657 struct tevent_req *pthreadpool_tevent_job_send(
658         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
659         struct pthreadpool_tevent *pool,
660         void (*fn)(void *private_data), void *private_data)
661 {
662         struct tevent_req *req = NULL;
663         struct pthreadpool_tevent_job_state *state = NULL;
664         struct pthreadpool_tevent_job *job = NULL;
665         int ret;
666
667         pthreadpool_tevent_cleanup_orphaned_jobs();
668
669         req = tevent_req_create(mem_ctx, &state,
670                                 struct pthreadpool_tevent_job_state);
671         if (req == NULL) {
672                 return NULL;
673         }
674         state->ev = ev;
675         state->req = req;
676
677         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
678
679         if (pool == NULL) {
680                 tevent_req_error(req, EINVAL);
681                 return tevent_req_post(req, ev);
682         }
683         if (pool->pool == NULL) {
684                 tevent_req_error(req, EINVAL);
685                 return tevent_req_post(req, ev);
686         }
687
688         ret = pthreadpool_tevent_register_ev(pool, state);
689         if (tevent_req_error(req, ret)) {
690                 return tevent_req_post(req, ev);
691         }
692
693         job = talloc_zero(state, struct pthreadpool_tevent_job);
694         if (tevent_req_nomem(job, req)) {
695                 return tevent_req_post(req, ev);
696         }
697         job->pool = pool;
698         job->fn = fn;
699         job->private_data = private_data;
700         job->im = tevent_create_immediate(state->job);
701         if (tevent_req_nomem(job->im, req)) {
702                 return tevent_req_post(req, ev);
703         }
704         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
705         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
706         DLIST_ADD_END(job->pool->jobs, job);
707         job->state = state;
708         state->job = job;
709
710         ret = pthreadpool_add_job(job->pool->pool, 0,
711                                   pthreadpool_tevent_job_fn,
712                                   job);
713         if (tevent_req_error(req, ret)) {
714                 return tevent_req_post(req, ev);
715         }
716
717         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
718         return req;
719 }
720
721 static __thread struct pthreadpool_tevent_job *current_job;
722
723 bool pthreadpool_tevent_current_job_canceled(void)
724 {
725         if (current_job == NULL) {
726                 /*
727                  * Should only be called from within
728                  * the job function.
729                  */
730                 abort();
731                 return false;
732         }
733
734         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
735         return current_job->needs_fence.maycancel;
736 }
737
738 bool pthreadpool_tevent_current_job_orphaned(void)
739 {
740         if (current_job == NULL) {
741                 /*
742                  * Should only be called from within
743                  * the job function.
744                  */
745                 abort();
746                 return false;
747         }
748
749         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
750         return current_job->needs_fence.orphaned;
751 }
752
753 bool pthreadpool_tevent_current_job_continue(void)
754 {
755         if (current_job == NULL) {
756                 /*
757                  * Should only be called from within
758                  * the job function.
759                  */
760                 abort();
761                 return false;
762         }
763
764         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
765         if (current_job->needs_fence.maycancel) {
766                 return false;
767         }
768         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
769         if (current_job->needs_fence.orphaned) {
770                 return false;
771         }
772
773         return true;
774 }
775
776 static void pthreadpool_tevent_job_fn(void *private_data)
777 {
778         struct pthreadpool_tevent_job *job =
779                 talloc_get_type_abort(private_data,
780                 struct pthreadpool_tevent_job);
781
782         current_job = job;
783         job->needs_fence.started = true;
784         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
785
786         job->fn(job->private_data);
787
788         job->needs_fence.executed = true;
789         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
790         current_job = NULL;
791 }
792
793 static int pthreadpool_tevent_job_signal(int jobid,
794                                          void (*job_fn)(void *private_data),
795                                          void *job_private_data,
796                                          void *private_data)
797 {
798         struct pthreadpool_tevent_job *job =
799                 talloc_get_type_abort(job_private_data,
800                 struct pthreadpool_tevent_job);
801
802         job->needs_fence.finished = true;
803         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
804         if (job->needs_fence.orphaned) {
805                 /* Request already gone */
806                 job->needs_fence.dropped = true;
807                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
808                 return 0;
809         }
810
811         /*
812          * state and state->glue are valid,
813          * see the job->needs_fence.finished
814          * "spinlock" loop in
815          * pthreadpool_tevent_job_orphan()
816          */
817         if (job->state->glue->tctx != NULL) {
818                 /* with HAVE_PTHREAD */
819                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
820                                                    job->im,
821                                                    pthreadpool_tevent_job_done,
822                                                    job);
823         } else {
824                 /* without HAVE_PTHREAD */
825                 tevent_schedule_immediate(job->im,
826                                           job->state->glue->ev,
827                                           pthreadpool_tevent_job_done,
828                                           job);
829         }
830
831         job->needs_fence.signaled = true;
832         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
833         return 0;
834 }
835
836 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
837                                         struct tevent_immediate *im,
838                                         void *private_data)
839 {
840         struct pthreadpool_tevent_job *job =
841                 talloc_get_type_abort(private_data,
842                 struct pthreadpool_tevent_job);
843         struct pthreadpool_tevent_job_state *state = job->state;
844
845         TALLOC_FREE(job->im);
846
847         if (state == NULL) {
848                 /* Request already gone */
849                 TALLOC_FREE(job);
850                 return;
851         }
852
853         /*
854          * pthreadpool_tevent_job_cleanup()
855          * (called by tevent_req_done() or
856          * tevent_req_error()) will destroy the job.
857          */
858
859         if (job->needs_fence.executed) {
860                 tevent_req_done(state->req);
861                 return;
862         }
863
864         tevent_req_error(state->req, ENOEXEC);
865         return;
866 }
867
868 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
869 {
870         struct pthreadpool_tevent_job_state *state =
871                 tevent_req_data(req,
872                 struct pthreadpool_tevent_job_state);
873         struct pthreadpool_tevent_job *job = state->job;
874         size_t num;
875
876         if (job == NULL) {
877                 return false;
878         }
879
880         job->needs_fence.maycancel = true;
881         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
882         if (job->needs_fence.started) {
883                 /*
884                  * It was too late to cancel the request.
885                  *
886                  * The job still has the chance to look
887                  * at pthreadpool_tevent_current_job_canceled()
888                  * or pthreadpool_tevent_current_job_continue()
889                  */
890                 return false;
891         }
892
893         num = pthreadpool_cancel_job(job->pool->pool, 0,
894                                      pthreadpool_tevent_job_fn,
895                                      job);
896         if (num == 0) {
897                 /*
898                  * It was too late to cancel the request.
899                  */
900                 return false;
901         }
902
903         /*
904          * It was not too late to cancel the request.
905          *
906          * We can remove job->im, as it will never be used.
907          */
908         TALLOC_FREE(job->im);
909
910         /*
911          * pthreadpool_tevent_job_cleanup()
912          * will destroy the job.
913          */
914         tevent_req_defer_callback(req, state->ev);
915         tevent_req_error(req, ECANCELED);
916         return true;
917 }
918
919 int pthreadpool_tevent_job_recv(struct tevent_req *req)
920 {
921         return tevent_req_simple_recv_unix(req);
922 }