Revert "pthreadpool: add a comment about a further optimization in pthreadpool_tevent...
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "system/filesys.h"
24 #include "pthreadpool_tevent.h"
25 #include "pthreadpool.h"
26 #include "lib/util/tevent_unix.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/util/attr.h"
29
30 /*
31  * We try to give some hints to helgrind/drd
32  *
33  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
34  * takes an memory address range that ignored by helgrind/drd
35  * 'description' is just ignored...
36  *
37  *
38  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
39  * just takes a DWORD/(void *) as unique key
40  * for the barrier.
41  */
42 #ifdef HAVE_VALGRIND_HELGRIND_H
43 #include <valgrind/helgrind.h>
44 #endif
45 #ifndef ANNOTATE_BENIGN_RACE_SIZED
46 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
47 #endif
48 #ifndef ANNOTATE_HAPPENS_BEFORE
49 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
50 #endif
51 #ifndef ANNOTATE_HAPPENS_AFTER
52 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
53 #endif
54 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
55 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
56 #endif
57
58 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
59         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
60         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
61                                    sizeof(__j->needs_fence), \
62                                    "race by design, protected by fence"); \
63 } while(0);
64
65 #ifdef WITH_PTHREADPOOL
66 /*
67  * configure checked we have pthread and atomic_thread_fence() available
68  */
69 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
70         atomic_thread_fence(__order); \
71 } while(0)
72 #else
73 /*
74  * we're using lib/pthreadpool/pthreadpool_sync.c ...
75  */
76 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
77 #ifndef HAVE___THREAD
78 #define __thread
79 #endif
80 #endif
81
82 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
83         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
84         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
85         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
86         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
87 } while(0);
88
89 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
90         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
91         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
92 } while(0);
93
94 struct pthreadpool_tevent_job_state;
95
96 /*
97  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
98  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
99  * contexts in a pthreadpool_tevent.
100  */
101 struct pthreadpool_tevent_glue {
102         struct pthreadpool_tevent_glue *prev, *next;
103         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
104         /* Tuple we are keeping track of in this list. */
105         struct tevent_context *ev;
106         struct tevent_threaded_context *tctx;
107         /* Pointer to link object owned by *ev. */
108         struct pthreadpool_tevent_glue_ev_link *ev_link;
109         /* active jobs */
110         struct pthreadpool_tevent_job_state *states;
111 };
112
113 /*
114  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
115  * tevent context from our list of active event contexts if the event context
116  * is destroyed.
117  * This structure is talloc()'ed from the struct tevent_context *, and is a
118  * back-pointer allowing the related struct pthreadpool_tevent_glue object
119  * to be removed from the struct pthreadpool_tevent glue list if the owning
120  * tevent_context is talloc_free()'ed.
121  */
122 struct pthreadpool_tevent_glue_ev_link {
123         struct pthreadpool_tevent_glue *glue;
124 };
125
126 struct pthreadpool_tevent {
127         struct pthreadpool *pool;
128         struct pthreadpool_tevent_glue *glue_list;
129
130         struct pthreadpool_tevent_job *jobs;
131 };
132
133 struct pthreadpool_tevent_job_state {
134         struct pthreadpool_tevent_job_state *prev, *next;
135         struct pthreadpool_tevent_glue *glue;
136         struct tevent_context *ev;
137         struct tevent_req *req;
138         struct pthreadpool_tevent_job *job;
139 };
140
141 struct pthreadpool_tevent_job {
142         struct pthreadpool_tevent_job *prev, *next;
143
144         struct pthreadpool_tevent *pool;
145         struct pthreadpool_tevent_job_state *state;
146         struct tevent_immediate *im;
147
148         void (*fn)(void *private_data);
149         void *private_data;
150
151         /*
152          * Coordination between threads
153          *
154          * There're only one side writing each element
155          * either the main process or the job thread.
156          *
157          * The coordination is done by a full memory
158          * barrier using atomic_thread_fence(memory_order_seq_cst)
159          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
160          */
161         struct {
162                 /*
163                  * 'maycancel'
164                  * set when tevent_req_cancel() is called.
165                  * (only written by main thread!)
166                  */
167                 bool maycancel;
168
169                 /*
170                  * 'orphaned'
171                  * set when talloc_free is called on the job request,
172                  * tevent_context or pthreadpool_tevent.
173                  * (only written by main thread!)
174                  */
175                 bool orphaned;
176
177                 /*
178                  * 'started'
179                  * set when the job is picked up by a worker thread
180                  * (only written by job thread!)
181                  */
182                 bool started;
183
184                 /*
185                  * 'executed'
186                  * set once the job function returned.
187                  * (only written by job thread!)
188                  */
189                 bool executed;
190
191                 /*
192                  * 'finished'
193                  * set when pthreadpool_tevent_job_signal() is entered
194                  * (only written by job thread!)
195                  */
196                 bool finished;
197
198                 /*
199                  * 'dropped'
200                  * set when pthreadpool_tevent_job_signal() leaves with
201                  * orphaned already set.
202                  * (only written by job thread!)
203                  */
204                 bool dropped;
205
206                 /*
207                  * 'signaled'
208                  * set when pthreadpool_tevent_job_signal() leaves normal
209                  * and the immediate event was scheduled.
210                  * (only written by job thread!)
211                  */
212                 bool signaled;
213         } needs_fence;
214 };
215
216 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
217
218 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
219
220 static struct pthreadpool_tevent_job *orphaned_jobs;
221
222 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
223 {
224         struct pthreadpool_tevent_job *job = NULL;
225         struct pthreadpool_tevent_job *njob = NULL;
226
227         for (job = orphaned_jobs; job != NULL; job = njob) {
228                 njob = job->next;
229
230                 /*
231                  * The job destructor keeps the job alive
232                  * (and in the list) or removes it from the list.
233                  */
234                 TALLOC_FREE(job);
235         }
236 }
237
238 static int pthreadpool_tevent_job_signal(int jobid,
239                                          void (*job_fn)(void *private_data),
240                                          void *job_private_data,
241                                          void *private_data);
242
243 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
244                             struct pthreadpool_tevent **presult)
245 {
246         struct pthreadpool_tevent *pool;
247         int ret;
248
249         pthreadpool_tevent_cleanup_orphaned_jobs();
250
251         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
252         if (pool == NULL) {
253                 return ENOMEM;
254         }
255
256         ret = pthreadpool_init(max_threads, &pool->pool,
257                                pthreadpool_tevent_job_signal, pool);
258         if (ret != 0) {
259                 TALLOC_FREE(pool);
260                 return ret;
261         }
262
263         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
264
265         *presult = pool;
266         return 0;
267 }
268
269 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
270 {
271         if (pool->pool == NULL) {
272                 return 0;
273         }
274
275         return pthreadpool_max_threads(pool->pool);
276 }
277
278 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
279 {
280         if (pool->pool == NULL) {
281                 return 0;
282         }
283
284         return pthreadpool_queued_jobs(pool->pool);
285 }
286
287 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
288 {
289         struct pthreadpool_tevent_job *job = NULL;
290         struct pthreadpool_tevent_job *njob = NULL;
291         struct pthreadpool_tevent_glue *glue = NULL;
292         int ret;
293
294         ret = pthreadpool_stop(pool->pool);
295         if (ret != 0) {
296                 return ret;
297         }
298
299         for (job = pool->jobs; job != NULL; job = njob) {
300                 njob = job->next;
301
302                 /* The job this removes it from the list */
303                 pthreadpool_tevent_job_orphan(job);
304         }
305
306         /*
307          * Delete all the registered
308          * tevent_context/tevent_threaded_context
309          * pairs.
310          */
311         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
312                 /* The glue destructor removes it from the list */
313                 TALLOC_FREE(glue);
314         }
315         pool->glue_list = NULL;
316
317         ret = pthreadpool_destroy(pool->pool);
318         if (ret != 0) {
319                 return ret;
320         }
321         pool->pool = NULL;
322
323         pthreadpool_tevent_cleanup_orphaned_jobs();
324
325         return 0;
326 }
327
328 static int pthreadpool_tevent_glue_destructor(
329         struct pthreadpool_tevent_glue *glue)
330 {
331         struct pthreadpool_tevent_job_state *state = NULL;
332         struct pthreadpool_tevent_job_state *nstate = NULL;
333
334         for (state = glue->states; state != NULL; state = nstate) {
335                 nstate = state->next;
336
337                 /* The job this removes it from the list */
338                 pthreadpool_tevent_job_orphan(state->job);
339         }
340
341         if (glue->pool->glue_list != NULL) {
342                 DLIST_REMOVE(glue->pool->glue_list, glue);
343         }
344
345         /* Ensure the ev_link destructor knows we're gone */
346         glue->ev_link->glue = NULL;
347
348         TALLOC_FREE(glue->ev_link);
349         TALLOC_FREE(glue->tctx);
350
351         return 0;
352 }
353
354 /*
355  * Destructor called either explicitly from
356  * pthreadpool_tevent_glue_destructor(), or indirectly
357  * when owning tevent_context is destroyed.
358  *
359  * When called from pthreadpool_tevent_glue_destructor()
360  * ev_link->glue is already NULL, so this does nothing.
361  *
362  * When called from talloc_free() of the owning
363  * tevent_context we must ensure we also remove the
364  * linked glue object from the list inside
365  * struct pthreadpool_tevent.
366  */
367 static int pthreadpool_tevent_glue_link_destructor(
368         struct pthreadpool_tevent_glue_ev_link *ev_link)
369 {
370         TALLOC_FREE(ev_link->glue);
371         return 0;
372 }
373
374 static int pthreadpool_tevent_register_ev(
375                                 struct pthreadpool_tevent *pool,
376                                 struct pthreadpool_tevent_job_state *state)
377 {
378         struct tevent_context *ev = state->ev;
379         struct pthreadpool_tevent_glue *glue = NULL;
380         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
381
382         /*
383          * See if this tevent_context was already registered by
384          * searching the glue object list. If so we have nothing
385          * to do here - we already have a tevent_context/tevent_threaded_context
386          * pair.
387          */
388         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
389                 if (glue->ev == state->ev) {
390                         state->glue = glue;
391                         DLIST_ADD_END(glue->states, state);
392                         return 0;
393                 }
394         }
395
396         /*
397          * Event context not yet registered - create a new glue
398          * object containing a tevent_context/tevent_threaded_context
399          * pair and put it on the list to remember this registration.
400          * We also need a link object to ensure the event context
401          * can't go away without us knowing about it.
402          */
403         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
404         if (glue == NULL) {
405                 return ENOMEM;
406         }
407         *glue = (struct pthreadpool_tevent_glue) {
408                 .pool = pool,
409                 .ev = ev,
410         };
411         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
412
413         /*
414          * Now allocate the link object to the event context. Note this
415          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
416          * context is freed we are able to cleanup the glue object
417          * in the link object destructor.
418          */
419
420         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
421         if (ev_link == NULL) {
422                 TALLOC_FREE(glue);
423                 return ENOMEM;
424         }
425         ev_link->glue = glue;
426         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
427
428         glue->ev_link = ev_link;
429
430 #ifdef HAVE_PTHREAD
431         glue->tctx = tevent_threaded_context_create(glue, ev);
432         if (glue->tctx == NULL) {
433                 TALLOC_FREE(ev_link);
434                 TALLOC_FREE(glue);
435                 return ENOMEM;
436         }
437 #endif
438
439         state->glue = glue;
440         DLIST_ADD_END(glue->states, state);
441
442         DLIST_ADD(pool->glue_list, glue);
443         return 0;
444 }
445
446 static void pthreadpool_tevent_job_fn(void *private_data);
447 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
448                                         struct tevent_immediate *im,
449                                         void *private_data);
450 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
451
452 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
453 {
454         /*
455          * We should never be called with needs_fence.orphaned == false.
456          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
457          * after detaching from the request state, glue and pool list.
458          */
459         if (!job->needs_fence.orphaned) {
460                 abort();
461         }
462
463         /*
464          * If the job is not finished (job->im still there)
465          * and it's still attached to the pool,
466          * we try to cancel it (before it was starts)
467          */
468         if (job->im != NULL && job->pool != NULL) {
469                 size_t num;
470
471                 num = pthreadpool_cancel_job(job->pool->pool, 0,
472                                              pthreadpool_tevent_job_fn,
473                                              job);
474                 if (num != 0) {
475                         /*
476                          * It was not too late to cancel the request.
477                          *
478                          * We can remove job->im, as it will never be used.
479                          */
480                         TALLOC_FREE(job->im);
481                 }
482         }
483
484         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
485         if (job->needs_fence.dropped) {
486                 /*
487                  * The signal function saw job->needs_fence.orphaned
488                  * before it started the signaling via the immediate
489                  * event. So we'll never geht triggered and can
490                  * remove job->im and let the whole job go...
491                  */
492                 TALLOC_FREE(job->im);
493         }
494
495         /*
496          * pthreadpool_tevent_job_orphan() already removed
497          * it from pool->jobs. And we don't need try
498          * pthreadpool_cancel_job() again.
499          */
500         job->pool = NULL;
501
502         if (job->im != NULL) {
503                 /*
504                  * state->im still there means, we need to wait for the
505                  * immediate event to be triggered or just leak the memory.
506                  *
507                  * Move it to the orphaned list, if it's not already there.
508                  */
509                 return -1;
510         }
511
512         /*
513          * Finally remove from the orphaned_jobs list
514          * and let talloc destroy us.
515          */
516         DLIST_REMOVE(orphaned_jobs, job);
517
518         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
519         return 0;
520 }
521
522 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
523 {
524         job->needs_fence.orphaned = true;
525         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
526
527         /*
528          * We're the only function that sets
529          * job->state = NULL;
530          */
531         if (job->state == NULL) {
532                 abort();
533         }
534
535         /*
536          * Once we marked the request as 'orphaned'
537          * we spin/loop if it's already marked
538          * as 'finished' (which means that
539          * pthreadpool_tevent_job_signal() was entered.
540          * If it saw 'orphaned' it will exit after setting
541          * 'dropped', otherwise it dereferences
542          * job->state->glue->{tctx,ev} until it exited
543          * after setting 'signaled'.
544          *
545          * We need to close this potential gab before
546          * we can set job->state = NULL.
547          *
548          * This is some kind of spinlock, but with
549          * 1 millisecond sleeps in between, in order
550          * to give the thread more cpu time to finish.
551          */
552         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
553         while (job->needs_fence.finished) {
554                 if (job->needs_fence.dropped) {
555                         break;
556                 }
557                 if (job->needs_fence.signaled) {
558                         break;
559                 }
560                 poll(NULL, 0, 1);
561                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
562         }
563
564         /*
565          * Once the gab is closed, we can remove
566          * the glue link.
567          */
568         DLIST_REMOVE(job->state->glue->states, job->state);
569         job->state->glue = NULL;
570
571         /*
572          * We need to reparent to a long term context.
573          * And detach from the request state.
574          * Maybe the destructor will keep the memory
575          * and leak it for now.
576          */
577         (void)talloc_reparent(job->state, NULL, job);
578         job->state->job = NULL;
579         job->state = NULL;
580
581         /*
582          * job->pool will only be set to NULL
583          * in the first destructur run.
584          */
585         if (job->pool == NULL) {
586                 abort();
587         }
588
589         /*
590          * Dettach it from the pool.
591          *
592          * The job might still be running,
593          * so we keep job->pool.
594          * The destructor will set it to NULL
595          * after trying pthreadpool_cancel_job()
596          */
597         DLIST_REMOVE(job->pool->jobs, job);
598
599         /*
600          * Add it to the list of orphaned jobs,
601          * which may be cleaned up later.
602          *
603          * The destructor removes it from the list
604          * when possible or it denies the free
605          * and keep it in the list.
606          */
607         DLIST_ADD_END(orphaned_jobs, job);
608         TALLOC_FREE(job);
609 }
610
611 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
612                                            enum tevent_req_state req_state)
613 {
614         struct pthreadpool_tevent_job_state *state =
615                 tevent_req_data(req,
616                 struct pthreadpool_tevent_job_state);
617
618         if (state->job == NULL) {
619                 /*
620                  * The job request is not scheduled in the pool
621                  * yet or anymore.
622                  */
623                 if (state->glue != NULL) {
624                         DLIST_REMOVE(state->glue->states, state);
625                         state->glue = NULL;
626                 }
627                 return;
628         }
629
630         /*
631          * We need to reparent to a long term context.
632          * Maybe the destructor will keep the memory
633          * and leak it for now.
634          */
635         pthreadpool_tevent_job_orphan(state->job);
636         state->job = NULL; /* not needed but looks better */
637         return;
638 }
639
640 struct tevent_req *pthreadpool_tevent_job_send(
641         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
642         struct pthreadpool_tevent *pool,
643         void (*fn)(void *private_data), void *private_data)
644 {
645         struct tevent_req *req = NULL;
646         struct pthreadpool_tevent_job_state *state = NULL;
647         struct pthreadpool_tevent_job *job = NULL;
648         int ret;
649
650         pthreadpool_tevent_cleanup_orphaned_jobs();
651
652         req = tevent_req_create(mem_ctx, &state,
653                                 struct pthreadpool_tevent_job_state);
654         if (req == NULL) {
655                 return NULL;
656         }
657         state->ev = ev;
658         state->req = req;
659
660         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
661
662         if (pool == NULL) {
663                 tevent_req_error(req, EINVAL);
664                 return tevent_req_post(req, ev);
665         }
666         if (pool->pool == NULL) {
667                 tevent_req_error(req, EINVAL);
668                 return tevent_req_post(req, ev);
669         }
670
671         ret = pthreadpool_tevent_register_ev(pool, state);
672         if (tevent_req_error(req, ret)) {
673                 return tevent_req_post(req, ev);
674         }
675
676         job = talloc_zero(state, struct pthreadpool_tevent_job);
677         if (tevent_req_nomem(job, req)) {
678                 return tevent_req_post(req, ev);
679         }
680         job->pool = pool;
681         job->fn = fn;
682         job->private_data = private_data;
683         job->im = tevent_create_immediate(state->job);
684         if (tevent_req_nomem(job->im, req)) {
685                 return tevent_req_post(req, ev);
686         }
687         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
688         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
689         DLIST_ADD_END(job->pool->jobs, job);
690         job->state = state;
691         state->job = job;
692
693         ret = pthreadpool_add_job(job->pool->pool, 0,
694                                   pthreadpool_tevent_job_fn,
695                                   job);
696         if (tevent_req_error(req, ret)) {
697                 return tevent_req_post(req, ev);
698         }
699
700         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
701         return req;
702 }
703
704 static __thread struct pthreadpool_tevent_job *current_job;
705
706 bool pthreadpool_tevent_current_job_canceled(void)
707 {
708         if (current_job == NULL) {
709                 /*
710                  * Should only be called from within
711                  * the job function.
712                  */
713                 abort();
714                 return false;
715         }
716
717         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
718         return current_job->needs_fence.maycancel;
719 }
720
721 bool pthreadpool_tevent_current_job_orphaned(void)
722 {
723         if (current_job == NULL) {
724                 /*
725                  * Should only be called from within
726                  * the job function.
727                  */
728                 abort();
729                 return false;
730         }
731
732         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
733         return current_job->needs_fence.orphaned;
734 }
735
736 bool pthreadpool_tevent_current_job_continue(void)
737 {
738         if (current_job == NULL) {
739                 /*
740                  * Should only be called from within
741                  * the job function.
742                  */
743                 abort();
744                 return false;
745         }
746
747         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
748         if (current_job->needs_fence.maycancel) {
749                 return false;
750         }
751         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
752         if (current_job->needs_fence.orphaned) {
753                 return false;
754         }
755
756         return true;
757 }
758
759 static void pthreadpool_tevent_job_fn(void *private_data)
760 {
761         struct pthreadpool_tevent_job *job =
762                 talloc_get_type_abort(private_data,
763                 struct pthreadpool_tevent_job);
764
765         current_job = job;
766         job->needs_fence.started = true;
767         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
768
769         job->fn(job->private_data);
770
771         job->needs_fence.executed = true;
772         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
773         current_job = NULL;
774 }
775
776 static int pthreadpool_tevent_job_signal(int jobid,
777                                          void (*job_fn)(void *private_data),
778                                          void *job_private_data,
779                                          void *private_data)
780 {
781         struct pthreadpool_tevent_job *job =
782                 talloc_get_type_abort(job_private_data,
783                 struct pthreadpool_tevent_job);
784
785         job->needs_fence.finished = true;
786         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
787         if (job->needs_fence.orphaned) {
788                 /* Request already gone */
789                 job->needs_fence.dropped = true;
790                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
791                 return 0;
792         }
793
794         /*
795          * state and state->glue are valid,
796          * see the job->needs_fence.finished
797          * "spinlock" loop in
798          * pthreadpool_tevent_job_orphan()
799          */
800         if (job->state->glue->tctx != NULL) {
801                 /* with HAVE_PTHREAD */
802                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
803                                                    job->im,
804                                                    pthreadpool_tevent_job_done,
805                                                    job);
806         } else {
807                 /* without HAVE_PTHREAD */
808                 tevent_schedule_immediate(job->im,
809                                           job->state->glue->ev,
810                                           pthreadpool_tevent_job_done,
811                                           job);
812         }
813
814         job->needs_fence.signaled = true;
815         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
816         return 0;
817 }
818
819 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
820                                         struct tevent_immediate *im,
821                                         void *private_data)
822 {
823         struct pthreadpool_tevent_job *job =
824                 talloc_get_type_abort(private_data,
825                 struct pthreadpool_tevent_job);
826         struct pthreadpool_tevent_job_state *state = job->state;
827
828         TALLOC_FREE(job->im);
829
830         if (state == NULL) {
831                 /* Request already gone */
832                 TALLOC_FREE(job);
833                 return;
834         }
835
836         /*
837          * pthreadpool_tevent_job_cleanup()
838          * (called by tevent_req_done() or
839          * tevent_req_error()) will destroy the job.
840          */
841
842         if (job->needs_fence.executed) {
843                 tevent_req_done(state->req);
844                 return;
845         }
846
847         tevent_req_error(state->req, ENOEXEC);
848         return;
849 }
850
851 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
852 {
853         struct pthreadpool_tevent_job_state *state =
854                 tevent_req_data(req,
855                 struct pthreadpool_tevent_job_state);
856         struct pthreadpool_tevent_job *job = state->job;
857         size_t num;
858
859         if (job == NULL) {
860                 return false;
861         }
862
863         job->needs_fence.maycancel = true;
864         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
865         if (job->needs_fence.started) {
866                 /*
867                  * It was too late to cancel the request.
868                  *
869                  * The job still has the chance to look
870                  * at pthreadpool_tevent_current_job_canceled()
871                  * or pthreadpool_tevent_current_job_continue()
872                  */
873                 return false;
874         }
875
876         num = pthreadpool_cancel_job(job->pool->pool, 0,
877                                      pthreadpool_tevent_job_fn,
878                                      job);
879         if (num == 0) {
880                 /*
881                  * It was too late to cancel the request.
882                  */
883                 return false;
884         }
885
886         /*
887          * It was not too late to cancel the request.
888          *
889          * We can remove job->im, as it will never be used.
890          */
891         TALLOC_FREE(job->im);
892
893         /*
894          * pthreadpool_tevent_job_cleanup()
895          * will destroy the job.
896          */
897         tevent_req_defer_callback(req, state->ev);
898         tevent_req_error(req, ECANCELED);
899         return true;
900 }
901
902 int pthreadpool_tevent_job_recv(struct tevent_req *req)
903 {
904         return tevent_req_simple_recv_unix(req);
905 }