Revert "pthreadpool: add helgrind magic to PTHREAD_TEVENT_JOB_THREAD_FENCE_*()"
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/threads.h"
22 #include "system/filesys.h"
23 #include "pthreadpool_tevent.h"
24 #include "pthreadpool.h"
25 #include "lib/util/tevent_unix.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/util/attr.h"
28
29 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
30         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
31 } while(0);
32
33 #ifdef WITH_PTHREADPOOL
34 /*
35  * configure checked we have pthread and atomic_thread_fence() available
36  */
37 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
38         atomic_thread_fence(__order); \
39 } while(0)
40 #else
41 /*
42  * we're using lib/pthreadpool/pthreadpool_sync.c ...
43  */
44 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
45 #ifndef HAVE___THREAD
46 #define __thread
47 #endif
48 #endif
49
50 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
51         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
52         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
53 } while(0);
54
55 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
56         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
57 } while(0);
58
59 struct pthreadpool_tevent_job_state;
60
61 /*
62  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
63  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
64  * contexts in a pthreadpool_tevent.
65  */
66 struct pthreadpool_tevent_glue {
67         struct pthreadpool_tevent_glue *prev, *next;
68         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
69         /* Tuple we are keeping track of in this list. */
70         struct tevent_context *ev;
71         struct tevent_threaded_context *tctx;
72         /* Pointer to link object owned by *ev. */
73         struct pthreadpool_tevent_glue_ev_link *ev_link;
74 };
75
76 /*
77  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
78  * tevent context from our list of active event contexts if the event context
79  * is destroyed.
80  * This structure is talloc()'ed from the struct tevent_context *, and is a
81  * back-pointer allowing the related struct pthreadpool_tevent_glue object
82  * to be removed from the struct pthreadpool_tevent glue list if the owning
83  * tevent_context is talloc_free()'ed.
84  */
85 struct pthreadpool_tevent_glue_ev_link {
86         struct pthreadpool_tevent_glue *glue;
87 };
88
89 struct pthreadpool_tevent {
90         struct pthreadpool *pool;
91         struct pthreadpool_tevent_glue *glue_list;
92
93         struct pthreadpool_tevent_job *jobs;
94 };
95
96 struct pthreadpool_tevent_job_state {
97         struct tevent_context *ev;
98         struct tevent_req *req;
99         struct pthreadpool_tevent_job *job;
100 };
101
102 struct pthreadpool_tevent_job {
103         struct pthreadpool_tevent_job *prev, *next;
104
105         struct pthreadpool_tevent *pool;
106         struct pthreadpool_tevent_job_state *state;
107         struct tevent_immediate *im;
108
109         void (*fn)(void *private_data);
110         void *private_data;
111
112         /*
113          * Coordination between threads
114          *
115          * There're only one side writing each element
116          * either the main process or the job thread.
117          *
118          * The coordination is done by a full memory
119          * barrier using atomic_thread_fence(memory_order_seq_cst)
120          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
121          */
122         struct {
123                 /*
124                  * 'maycancel'
125                  * set when tevent_req_cancel() is called.
126                  * (only written by main thread!)
127                  */
128                 bool maycancel;
129
130                 /*
131                  * 'orphaned'
132                  * set when talloc_free is called on the job request,
133                  * tevent_context or pthreadpool_tevent.
134                  * (only written by main thread!)
135                  */
136                 bool orphaned;
137
138                 /*
139                  * 'started'
140                  * set when the job is picked up by a worker thread
141                  * (only written by job thread!)
142                  */
143                 bool started;
144
145                 /*
146                  * 'executed'
147                  * set once the job function returned.
148                  * (only written by job thread!)
149                  */
150                 bool executed;
151
152                 /*
153                  * 'finished'
154                  * set when pthreadpool_tevent_job_signal() is entered
155                  * (only written by job thread!)
156                  */
157                 bool finished;
158
159                 /*
160                  * 'dropped'
161                  * set when pthreadpool_tevent_job_signal() leaves with
162                  * orphaned already set.
163                  * (only written by job thread!)
164                  */
165                 bool dropped;
166
167                 /*
168                  * 'signaled'
169                  * set when pthreadpool_tevent_job_signal() leaves normal
170                  * and the immediate event was scheduled.
171                  * (only written by job thread!)
172                  */
173                 bool signaled;
174         } needs_fence;
175 };
176
177 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
178
179 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
180
181 static struct pthreadpool_tevent_job *orphaned_jobs;
182
183 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
184 {
185         struct pthreadpool_tevent_job *job = NULL;
186         struct pthreadpool_tevent_job *njob = NULL;
187
188         for (job = orphaned_jobs; job != NULL; job = njob) {
189                 njob = job->next;
190
191                 /*
192                  * The job destructor keeps the job alive
193                  * (and in the list) or removes it from the list.
194                  */
195                 TALLOC_FREE(job);
196         }
197 }
198
199 static int pthreadpool_tevent_job_signal(int jobid,
200                                          void (*job_fn)(void *private_data),
201                                          void *job_private_data,
202                                          void *private_data);
203
204 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
205                             struct pthreadpool_tevent **presult)
206 {
207         struct pthreadpool_tevent *pool;
208         int ret;
209
210         pthreadpool_tevent_cleanup_orphaned_jobs();
211
212         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
213         if (pool == NULL) {
214                 return ENOMEM;
215         }
216
217         ret = pthreadpool_init(max_threads, &pool->pool,
218                                pthreadpool_tevent_job_signal, pool);
219         if (ret != 0) {
220                 TALLOC_FREE(pool);
221                 return ret;
222         }
223
224         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
225
226         *presult = pool;
227         return 0;
228 }
229
230 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
231 {
232         if (pool->pool == NULL) {
233                 return 0;
234         }
235
236         return pthreadpool_max_threads(pool->pool);
237 }
238
239 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
240 {
241         if (pool->pool == NULL) {
242                 return 0;
243         }
244
245         return pthreadpool_queued_jobs(pool->pool);
246 }
247
248 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
249 {
250         struct pthreadpool_tevent_job *job = NULL;
251         struct pthreadpool_tevent_job *njob = NULL;
252         struct pthreadpool_tevent_glue *glue = NULL;
253         int ret;
254
255         ret = pthreadpool_stop(pool->pool);
256         if (ret != 0) {
257                 return ret;
258         }
259
260         for (job = pool->jobs; job != NULL; job = njob) {
261                 njob = job->next;
262
263                 /* The job this removes it from the list */
264                 pthreadpool_tevent_job_orphan(job);
265         }
266
267         /*
268          * Delete all the registered
269          * tevent_context/tevent_threaded_context
270          * pairs.
271          */
272         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
273                 /* The glue destructor removes it from the list */
274                 TALLOC_FREE(glue);
275         }
276         pool->glue_list = NULL;
277
278         ret = pthreadpool_destroy(pool->pool);
279         if (ret != 0) {
280                 return ret;
281         }
282         pool->pool = NULL;
283
284         pthreadpool_tevent_cleanup_orphaned_jobs();
285
286         return 0;
287 }
288
289 static int pthreadpool_tevent_glue_destructor(
290         struct pthreadpool_tevent_glue *glue)
291 {
292         if (glue->pool->glue_list != NULL) {
293                 DLIST_REMOVE(glue->pool->glue_list, glue);
294         }
295
296         /* Ensure the ev_link destructor knows we're gone */
297         glue->ev_link->glue = NULL;
298
299         TALLOC_FREE(glue->ev_link);
300         TALLOC_FREE(glue->tctx);
301
302         return 0;
303 }
304
305 /*
306  * Destructor called either explicitly from
307  * pthreadpool_tevent_glue_destructor(), or indirectly
308  * when owning tevent_context is destroyed.
309  *
310  * When called from pthreadpool_tevent_glue_destructor()
311  * ev_link->glue is already NULL, so this does nothing.
312  *
313  * When called from talloc_free() of the owning
314  * tevent_context we must ensure we also remove the
315  * linked glue object from the list inside
316  * struct pthreadpool_tevent.
317  */
318 static int pthreadpool_tevent_glue_link_destructor(
319         struct pthreadpool_tevent_glue_ev_link *ev_link)
320 {
321         TALLOC_FREE(ev_link->glue);
322         return 0;
323 }
324
325 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
326                                           struct tevent_context *ev)
327 {
328         struct pthreadpool_tevent_glue *glue = NULL;
329         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
330
331         /*
332          * See if this tevent_context was already registered by
333          * searching the glue object list. If so we have nothing
334          * to do here - we already have a tevent_context/tevent_threaded_context
335          * pair.
336          */
337         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
338                 if (glue->ev == ev) {
339                         return 0;
340                 }
341         }
342
343         /*
344          * Event context not yet registered - create a new glue
345          * object containing a tevent_context/tevent_threaded_context
346          * pair and put it on the list to remember this registration.
347          * We also need a link object to ensure the event context
348          * can't go away without us knowing about it.
349          */
350         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
351         if (glue == NULL) {
352                 return ENOMEM;
353         }
354         *glue = (struct pthreadpool_tevent_glue) {
355                 .pool = pool,
356                 .ev = ev,
357         };
358         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
359
360         /*
361          * Now allocate the link object to the event context. Note this
362          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
363          * context is freed we are able to cleanup the glue object
364          * in the link object destructor.
365          */
366
367         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
368         if (ev_link == NULL) {
369                 TALLOC_FREE(glue);
370                 return ENOMEM;
371         }
372         ev_link->glue = glue;
373         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
374
375         glue->ev_link = ev_link;
376
377 #ifdef HAVE_PTHREAD
378         glue->tctx = tevent_threaded_context_create(glue, ev);
379         if (glue->tctx == NULL) {
380                 TALLOC_FREE(ev_link);
381                 TALLOC_FREE(glue);
382                 return ENOMEM;
383         }
384 #endif
385
386         DLIST_ADD(pool->glue_list, glue);
387         return 0;
388 }
389
390 static void pthreadpool_tevent_job_fn(void *private_data);
391 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
392                                         struct tevent_immediate *im,
393                                         void *private_data);
394 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
395
396 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
397 {
398         /*
399          * We should never be called with needs_fence.orphaned == false.
400          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
401          * after detaching from the request state and pool list.
402          */
403         if (!job->needs_fence.orphaned) {
404                 abort();
405         }
406
407         /*
408          * If the job is not finished (job->im still there)
409          * and it's still attached to the pool,
410          * we try to cancel it (before it was starts)
411          */
412         if (job->im != NULL && job->pool != NULL) {
413                 size_t num;
414
415                 num = pthreadpool_cancel_job(job->pool->pool, 0,
416                                              pthreadpool_tevent_job_fn,
417                                              job);
418                 if (num != 0) {
419                         /*
420                          * It was not too late to cancel the request.
421                          *
422                          * We can remove job->im, as it will never be used.
423                          */
424                         TALLOC_FREE(job->im);
425                 }
426         }
427
428         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
429         if (job->needs_fence.dropped) {
430                 /*
431                  * The signal function saw job->needs_fence.orphaned
432                  * before it started the signaling via the immediate
433                  * event. So we'll never geht triggered and can
434                  * remove job->im and let the whole job go...
435                  */
436                 TALLOC_FREE(job->im);
437         }
438
439         /*
440          * pthreadpool_tevent_job_orphan() already removed
441          * it from pool->jobs. And we don't need try
442          * pthreadpool_cancel_job() again.
443          */
444         job->pool = NULL;
445
446         if (job->im != NULL) {
447                 /*
448                  * state->im still there means, we need to wait for the
449                  * immediate event to be triggered or just leak the memory.
450                  *
451                  * Move it to the orphaned list, if it's not already there.
452                  */
453                 return -1;
454         }
455
456         /*
457          * Finally remove from the orphaned_jobs list
458          * and let talloc destroy us.
459          */
460         DLIST_REMOVE(orphaned_jobs, job);
461
462         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
463         return 0;
464 }
465
466 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
467 {
468         job->needs_fence.orphaned = true;
469         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
470
471         /*
472          * We're the only function that sets
473          * job->state = NULL;
474          */
475         if (job->state == NULL) {
476                 abort();
477         }
478
479         /*
480          * We need to reparent to a long term context.
481          * And detach from the request state.
482          * Maybe the destructor will keep the memory
483          * and leak it for now.
484          */
485         (void)talloc_reparent(job->state, NULL, job);
486         job->state->job = NULL;
487         job->state = NULL;
488
489         /*
490          * job->pool will only be set to NULL
491          * in the first destructur run.
492          */
493         if (job->pool == NULL) {
494                 abort();
495         }
496
497         /*
498          * Dettach it from the pool.
499          *
500          * The job might still be running,
501          * so we keep job->pool.
502          * The destructor will set it to NULL
503          * after trying pthreadpool_cancel_job()
504          */
505         DLIST_REMOVE(job->pool->jobs, job);
506
507         /*
508          * Add it to the list of orphaned jobs,
509          * which may be cleaned up later.
510          *
511          * The destructor removes it from the list
512          * when possible or it denies the free
513          * and keep it in the list.
514          */
515         DLIST_ADD_END(orphaned_jobs, job);
516         TALLOC_FREE(job);
517 }
518
519 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
520                                            enum tevent_req_state req_state)
521 {
522         struct pthreadpool_tevent_job_state *state =
523                 tevent_req_data(req,
524                 struct pthreadpool_tevent_job_state);
525
526         if (state->job == NULL) {
527                 /*
528                  * The job request is not scheduled in the pool
529                  * yet or anymore.
530                  */
531                 return;
532         }
533
534         /*
535          * We need to reparent to a long term context.
536          * Maybe the destructor will keep the memory
537          * and leak it for now.
538          */
539         pthreadpool_tevent_job_orphan(state->job);
540         state->job = NULL; /* not needed but looks better */
541         return;
542 }
543
544 struct tevent_req *pthreadpool_tevent_job_send(
545         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
546         struct pthreadpool_tevent *pool,
547         void (*fn)(void *private_data), void *private_data)
548 {
549         struct tevent_req *req = NULL;
550         struct pthreadpool_tevent_job_state *state = NULL;
551         struct pthreadpool_tevent_job *job = NULL;
552         int ret;
553
554         pthreadpool_tevent_cleanup_orphaned_jobs();
555
556         req = tevent_req_create(mem_ctx, &state,
557                                 struct pthreadpool_tevent_job_state);
558         if (req == NULL) {
559                 return NULL;
560         }
561         state->ev = ev;
562         state->req = req;
563
564         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
565
566         if (pool == NULL) {
567                 tevent_req_error(req, EINVAL);
568                 return tevent_req_post(req, ev);
569         }
570         if (pool->pool == NULL) {
571                 tevent_req_error(req, EINVAL);
572                 return tevent_req_post(req, ev);
573         }
574
575         ret = pthreadpool_tevent_register_ev(pool, ev);
576         if (tevent_req_error(req, ret)) {
577                 return tevent_req_post(req, ev);
578         }
579
580         job = talloc_zero(state, struct pthreadpool_tevent_job);
581         if (tevent_req_nomem(job, req)) {
582                 return tevent_req_post(req, ev);
583         }
584         job->pool = pool;
585         job->fn = fn;
586         job->private_data = private_data;
587         job->im = tevent_create_immediate(state->job);
588         if (tevent_req_nomem(job->im, req)) {
589                 return tevent_req_post(req, ev);
590         }
591         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
592         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
593         DLIST_ADD_END(job->pool->jobs, job);
594         job->state = state;
595         state->job = job;
596
597         ret = pthreadpool_add_job(job->pool->pool, 0,
598                                   pthreadpool_tevent_job_fn,
599                                   job);
600         if (tevent_req_error(req, ret)) {
601                 return tevent_req_post(req, ev);
602         }
603
604         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
605         return req;
606 }
607
608 static __thread struct pthreadpool_tevent_job *current_job;
609
610 bool pthreadpool_tevent_current_job_canceled(void)
611 {
612         if (current_job == NULL) {
613                 /*
614                  * Should only be called from within
615                  * the job function.
616                  */
617                 abort();
618                 return false;
619         }
620
621         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
622         return current_job->needs_fence.maycancel;
623 }
624
625 bool pthreadpool_tevent_current_job_orphaned(void)
626 {
627         if (current_job == NULL) {
628                 /*
629                  * Should only be called from within
630                  * the job function.
631                  */
632                 abort();
633                 return false;
634         }
635
636         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
637         return current_job->needs_fence.orphaned;
638 }
639
640 bool pthreadpool_tevent_current_job_continue(void)
641 {
642         if (current_job == NULL) {
643                 /*
644                  * Should only be called from within
645                  * the job function.
646                  */
647                 abort();
648                 return false;
649         }
650
651         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
652         if (current_job->needs_fence.maycancel) {
653                 return false;
654         }
655         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
656         if (current_job->needs_fence.orphaned) {
657                 return false;
658         }
659
660         return true;
661 }
662
663 static void pthreadpool_tevent_job_fn(void *private_data)
664 {
665         struct pthreadpool_tevent_job *job =
666                 talloc_get_type_abort(private_data,
667                 struct pthreadpool_tevent_job);
668
669         current_job = job;
670         job->needs_fence.started = true;
671         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
672
673         job->fn(job->private_data);
674
675         job->needs_fence.executed = true;
676         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
677         current_job = NULL;
678 }
679
680 static int pthreadpool_tevent_job_signal(int jobid,
681                                          void (*job_fn)(void *private_data),
682                                          void *job_private_data,
683                                          void *private_data)
684 {
685         struct pthreadpool_tevent_job *job =
686                 talloc_get_type_abort(job_private_data,
687                 struct pthreadpool_tevent_job);
688         struct pthreadpool_tevent_job_state *state = job->state;
689         struct tevent_threaded_context *tctx = NULL;
690         struct pthreadpool_tevent_glue *g = NULL;
691
692         job->needs_fence.finished = true;
693         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
694         if (job->needs_fence.orphaned) {
695                 /* Request already gone */
696                 job->needs_fence.dropped = true;
697                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
698                 return 0;
699         }
700
701 #ifdef HAVE_PTHREAD
702         for (g = job->pool->glue_list; g != NULL; g = g->next) {
703                 if (g->ev == state->ev) {
704                         tctx = g->tctx;
705                         break;
706                 }
707         }
708
709         if (tctx == NULL) {
710                 abort();
711         }
712 #endif
713
714         if (tctx != NULL) {
715                 /* with HAVE_PTHREAD */
716                 tevent_threaded_schedule_immediate(tctx, job->im,
717                                                    pthreadpool_tevent_job_done,
718                                                    job);
719         } else {
720                 /* without HAVE_PTHREAD */
721                 tevent_schedule_immediate(job->im, state->ev,
722                                           pthreadpool_tevent_job_done,
723                                           job);
724         }
725
726         job->needs_fence.signaled = true;
727         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
728         return 0;
729 }
730
731 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
732                                         struct tevent_immediate *im,
733                                         void *private_data)
734 {
735         struct pthreadpool_tevent_job *job =
736                 talloc_get_type_abort(private_data,
737                 struct pthreadpool_tevent_job);
738         struct pthreadpool_tevent_job_state *state = job->state;
739
740         TALLOC_FREE(job->im);
741
742         if (state == NULL) {
743                 /* Request already gone */
744                 TALLOC_FREE(job);
745                 return;
746         }
747
748         /*
749          * pthreadpool_tevent_job_cleanup()
750          * (called by tevent_req_done() or
751          * tevent_req_error()) will destroy the job.
752          */
753
754         if (job->needs_fence.executed) {
755                 tevent_req_done(state->req);
756                 return;
757         }
758
759         tevent_req_error(state->req, ENOEXEC);
760         return;
761 }
762
763 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
764 {
765         struct pthreadpool_tevent_job_state *state =
766                 tevent_req_data(req,
767                 struct pthreadpool_tevent_job_state);
768         struct pthreadpool_tevent_job *job = state->job;
769         size_t num;
770
771         if (job == NULL) {
772                 return false;
773         }
774
775         job->needs_fence.maycancel = true;
776         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
777         if (job->needs_fence.started) {
778                 /*
779                  * It was too late to cancel the request.
780                  *
781                  * The job still has the chance to look
782                  * at pthreadpool_tevent_current_job_canceled()
783                  * or pthreadpool_tevent_current_job_continue()
784                  */
785                 return false;
786         }
787
788         num = pthreadpool_cancel_job(job->pool->pool, 0,
789                                      pthreadpool_tevent_job_fn,
790                                      job);
791         if (num == 0) {
792                 /*
793                  * It was too late to cancel the request.
794                  */
795                 return false;
796         }
797
798         /*
799          * It was not too late to cancel the request.
800          *
801          * We can remove job->im, as it will never be used.
802          */
803         TALLOC_FREE(job->im);
804
805         /*
806          * pthreadpool_tevent_job_cleanup()
807          * will destroy the job.
808          */
809         tevent_req_defer_callback(req, state->ev);
810         tevent_req_error(req, ECANCELED);
811         return true;
812 }
813
814 int pthreadpool_tevent_job_recv(struct tevent_req *req)
815 {
816         return tevent_req_simple_recv_unix(req);
817 }