fbf9c0e835a6420f851193921fd0ca2a74855969
[metze/samba/wip.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/threads.h"
22 #include "pthreadpool_tevent.h"
23 #include "pthreadpool.h"
24 #include "lib/util/tevent_unix.h"
25 #include "lib/util/dlinklist.h"
26 #include "lib/util/attr.h"
27
28 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
29         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
30 } while(0);
31
32 #ifdef WITH_PTHREADPOOL
33 /*
34  * configure checked we have pthread and atomic_thread_fence() available
35  */
36 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
37         atomic_thread_fence(__order); \
38 } while(0)
39 #else
40 /*
41  * we're using lib/pthreadpool/pthreadpool_sync.c ...
42  */
43 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
44 #ifndef HAVE___THREAD
45 #define __thread
46 #endif
47 #endif
48
49 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
50         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
51         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
52 } while(0);
53
54 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
55         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
56 } while(0);
57
58 struct pthreadpool_tevent_job_state;
59
60 /*
61  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
62  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
63  * contexts in a pthreadpool_tevent.
64  */
65 struct pthreadpool_tevent_glue {
66         struct pthreadpool_tevent_glue *prev, *next;
67         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
68         /* Tuple we are keeping track of in this list. */
69         struct tevent_context *ev;
70         struct tevent_threaded_context *tctx;
71         /* Pointer to link object owned by *ev. */
72         struct pthreadpool_tevent_glue_ev_link *ev_link;
73 };
74
75 /*
76  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
77  * tevent context from our list of active event contexts if the event context
78  * is destroyed.
79  * This structure is talloc()'ed from the struct tevent_context *, and is a
80  * back-pointer allowing the related struct pthreadpool_tevent_glue object
81  * to be removed from the struct pthreadpool_tevent glue list if the owning
82  * tevent_context is talloc_free()'ed.
83  */
84 struct pthreadpool_tevent_glue_ev_link {
85         struct pthreadpool_tevent_glue *glue;
86 };
87
88 struct pthreadpool_tevent {
89         struct pthreadpool *pool;
90         struct pthreadpool_tevent_glue *glue_list;
91
92         struct pthreadpool_tevent_job *jobs;
93 };
94
95 struct pthreadpool_tevent_job_state {
96         struct tevent_context *ev;
97         struct tevent_req *req;
98         struct pthreadpool_tevent_job *job;
99 };
100
101 struct pthreadpool_tevent_job {
102         struct pthreadpool_tevent_job *prev, *next;
103
104         struct pthreadpool_tevent *pool;
105         struct pthreadpool_tevent_job_state *state;
106         struct tevent_immediate *im;
107
108         void (*fn)(void *private_data);
109         void *private_data;
110
111         /*
112          * Coordination between threads
113          *
114          * There're only one side writing each element
115          * either the main process or the job thread.
116          *
117          * The coordination is done by a full memory
118          * barrier using atomic_thread_fence(memory_order_seq_cst)
119          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
120          */
121         struct {
122                 /*
123                  * 'maycancel'
124                  * set when tevent_req_cancel() is called.
125                  * (only written by main thread!)
126                  */
127                 bool maycancel;
128
129                 /*
130                  * 'orphaned'
131                  * set when talloc_free is called on the job request,
132                  * tevent_context or pthreadpool_tevent.
133                  * (only written by main thread!)
134                  */
135                 bool orphaned;
136
137                 /*
138                  * 'started'
139                  * set when the job is picked up by a worker thread
140                  * (only written by job thread!)
141                  */
142                 bool started;
143
144                 /*
145                  * 'executed'
146                  * set once the job function returned.
147                  * (only written by job thread!)
148                  */
149                 bool executed;
150
151                 /*
152                  * 'finished'
153                  * set when pthreadpool_tevent_job_signal() is entered
154                  * (only written by job thread!)
155                  */
156                 bool finished;
157
158                 /*
159                  * 'dropped'
160                  * set when pthreadpool_tevent_job_signal() leaves with
161                  * orphaned already set.
162                  * (only written by job thread!)
163                  */
164                 bool dropped;
165
166                 /*
167                  * 'signaled'
168                  * set when pthreadpool_tevent_job_signal() leaves normal
169                  * and the immediate event was scheduled.
170                  * (only written by job thread!)
171                  */
172                 bool signaled;
173         } needs_fence;
174 };
175
176 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
177
178 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
179
180 static struct pthreadpool_tevent_job *orphaned_jobs;
181
182 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
183 {
184         struct pthreadpool_tevent_job *job = NULL;
185         struct pthreadpool_tevent_job *njob = NULL;
186
187         for (job = orphaned_jobs; job != NULL; job = njob) {
188                 njob = job->next;
189
190                 /*
191                  * The job destructor keeps the job alive
192                  * (and in the list) or removes it from the list.
193                  */
194                 TALLOC_FREE(job);
195         }
196 }
197
198 static int pthreadpool_tevent_job_signal(int jobid,
199                                          void (*job_fn)(void *private_data),
200                                          void *job_private_data,
201                                          void *private_data);
202
203 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
204                             struct pthreadpool_tevent **presult)
205 {
206         struct pthreadpool_tevent *pool;
207         int ret;
208
209         pthreadpool_tevent_cleanup_orphaned_jobs();
210
211         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
212         if (pool == NULL) {
213                 return ENOMEM;
214         }
215
216         ret = pthreadpool_init(max_threads, &pool->pool,
217                                pthreadpool_tevent_job_signal, pool);
218         if (ret != 0) {
219                 TALLOC_FREE(pool);
220                 return ret;
221         }
222
223         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
224
225         *presult = pool;
226         return 0;
227 }
228
229 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
230 {
231         if (pool->pool == NULL) {
232                 return 0;
233         }
234
235         return pthreadpool_max_threads(pool->pool);
236 }
237
238 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
239 {
240         if (pool->pool == NULL) {
241                 return 0;
242         }
243
244         return pthreadpool_queued_jobs(pool->pool);
245 }
246
247 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
248 {
249         struct pthreadpool_tevent_job *job = NULL;
250         struct pthreadpool_tevent_job *njob = NULL;
251         struct pthreadpool_tevent_glue *glue = NULL;
252         int ret;
253
254         ret = pthreadpool_stop(pool->pool);
255         if (ret != 0) {
256                 return ret;
257         }
258
259         for (job = pool->jobs; job != NULL; job = njob) {
260                 njob = job->next;
261
262                 /* The job this removes it from the list */
263                 pthreadpool_tevent_job_orphan(job);
264         }
265
266         /*
267          * Delete all the registered
268          * tevent_context/tevent_threaded_context
269          * pairs.
270          */
271         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
272                 /* The glue destructor removes it from the list */
273                 TALLOC_FREE(glue);
274         }
275         pool->glue_list = NULL;
276
277         ret = pthreadpool_destroy(pool->pool);
278         if (ret != 0) {
279                 return ret;
280         }
281         pool->pool = NULL;
282
283         pthreadpool_tevent_cleanup_orphaned_jobs();
284
285         return 0;
286 }
287
288 static int pthreadpool_tevent_glue_destructor(
289         struct pthreadpool_tevent_glue *glue)
290 {
291         if (glue->pool->glue_list != NULL) {
292                 DLIST_REMOVE(glue->pool->glue_list, glue);
293         }
294
295         /* Ensure the ev_link destructor knows we're gone */
296         glue->ev_link->glue = NULL;
297
298         TALLOC_FREE(glue->ev_link);
299         TALLOC_FREE(glue->tctx);
300
301         return 0;
302 }
303
304 /*
305  * Destructor called either explicitly from
306  * pthreadpool_tevent_glue_destructor(), or indirectly
307  * when owning tevent_context is destroyed.
308  *
309  * When called from pthreadpool_tevent_glue_destructor()
310  * ev_link->glue is already NULL, so this does nothing.
311  *
312  * When called from talloc_free() of the owning
313  * tevent_context we must ensure we also remove the
314  * linked glue object from the list inside
315  * struct pthreadpool_tevent.
316  */
317 static int pthreadpool_tevent_glue_link_destructor(
318         struct pthreadpool_tevent_glue_ev_link *ev_link)
319 {
320         TALLOC_FREE(ev_link->glue);
321         return 0;
322 }
323
324 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
325                                           struct tevent_context *ev)
326 {
327         struct pthreadpool_tevent_glue *glue = NULL;
328         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
329
330         /*
331          * See if this tevent_context was already registered by
332          * searching the glue object list. If so we have nothing
333          * to do here - we already have a tevent_context/tevent_threaded_context
334          * pair.
335          */
336         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
337                 if (glue->ev == ev) {
338                         return 0;
339                 }
340         }
341
342         /*
343          * Event context not yet registered - create a new glue
344          * object containing a tevent_context/tevent_threaded_context
345          * pair and put it on the list to remember this registration.
346          * We also need a link object to ensure the event context
347          * can't go away without us knowing about it.
348          */
349         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
350         if (glue == NULL) {
351                 return ENOMEM;
352         }
353         *glue = (struct pthreadpool_tevent_glue) {
354                 .pool = pool,
355                 .ev = ev,
356         };
357         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
358
359         /*
360          * Now allocate the link object to the event context. Note this
361          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
362          * context is freed we are able to cleanup the glue object
363          * in the link object destructor.
364          */
365
366         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
367         if (ev_link == NULL) {
368                 TALLOC_FREE(glue);
369                 return ENOMEM;
370         }
371         ev_link->glue = glue;
372         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
373
374         glue->ev_link = ev_link;
375
376 #ifdef HAVE_PTHREAD
377         glue->tctx = tevent_threaded_context_create(glue, ev);
378         if (glue->tctx == NULL) {
379                 TALLOC_FREE(ev_link);
380                 TALLOC_FREE(glue);
381                 return ENOMEM;
382         }
383 #endif
384
385         DLIST_ADD(pool->glue_list, glue);
386         return 0;
387 }
388
389 static void pthreadpool_tevent_job_fn(void *private_data);
390 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
391                                         struct tevent_immediate *im,
392                                         void *private_data);
393 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
394
395 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
396 {
397         /*
398          * We should never be called with needs_fence.orphaned == false.
399          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
400          * after detaching from the request state and pool list.
401          */
402         if (!job->needs_fence.orphaned) {
403                 abort();
404         }
405
406         /*
407          * If the job is not finished (job->im still there)
408          * and it's still attached to the pool,
409          * we try to cancel it (before it was starts)
410          */
411         if (job->im != NULL && job->pool != NULL) {
412                 size_t num;
413
414                 num = pthreadpool_cancel_job(job->pool->pool, 0,
415                                              pthreadpool_tevent_job_fn,
416                                              job);
417                 if (num != 0) {
418                         /*
419                          * It was not too late to cancel the request.
420                          *
421                          * We can remove job->im, as it will never be used.
422                          */
423                         TALLOC_FREE(job->im);
424                 }
425         }
426
427         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
428         if (job->needs_fence.dropped) {
429                 /*
430                  * The signal function saw job->needs_fence.orphaned
431                  * before it started the signaling via the immediate
432                  * event. So we'll never geht triggered and can
433                  * remove job->im and let the whole job go...
434                  */
435                 TALLOC_FREE(job->im);
436         }
437
438         /*
439          * pthreadpool_tevent_job_orphan() already removed
440          * it from pool->jobs. And we don't need try
441          * pthreadpool_cancel_job() again.
442          */
443         job->pool = NULL;
444
445         if (job->im != NULL) {
446                 /*
447                  * state->im still there means, we need to wait for the
448                  * immediate event to be triggered or just leak the memory.
449                  *
450                  * Move it to the orphaned list, if it's not already there.
451                  */
452                 return -1;
453         }
454
455         /*
456          * Finally remove from the orphaned_jobs list
457          * and let talloc destroy us.
458          */
459         DLIST_REMOVE(orphaned_jobs, job);
460
461         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
462         return 0;
463 }
464
465 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
466 {
467         job->needs_fence.orphaned = true;
468         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
469
470         /*
471          * We're the only function that sets
472          * job->state = NULL;
473          */
474         if (job->state == NULL) {
475                 abort();
476         }
477
478         /*
479          * We need to reparent to a long term context.
480          * And detach from the request state.
481          * Maybe the destructor will keep the memory
482          * and leak it for now.
483          */
484         (void)talloc_reparent(job->state, NULL, job);
485         job->state->job = NULL;
486         job->state = NULL;
487
488         /*
489          * job->pool will only be set to NULL
490          * in the first destructur run.
491          */
492         if (job->pool == NULL) {
493                 abort();
494         }
495
496         /*
497          * Dettach it from the pool.
498          *
499          * The job might still be running,
500          * so we keep job->pool.
501          * The destructor will set it to NULL
502          * after trying pthreadpool_cancel_job()
503          */
504         DLIST_REMOVE(job->pool->jobs, job);
505
506         /*
507          * Add it to the list of orphaned jobs,
508          * which may be cleaned up later.
509          *
510          * The destructor removes it from the list
511          * when possible or it denies the free
512          * and keep it in the list.
513          */
514         DLIST_ADD_END(orphaned_jobs, job);
515         TALLOC_FREE(job);
516 }
517
518 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
519                                            enum tevent_req_state req_state)
520 {
521         struct pthreadpool_tevent_job_state *state =
522                 tevent_req_data(req,
523                 struct pthreadpool_tevent_job_state);
524
525         if (state->job == NULL) {
526                 /*
527                  * The job request is not scheduled in the pool
528                  * yet or anymore.
529                  */
530                 return;
531         }
532
533         /*
534          * We need to reparent to a long term context.
535          * Maybe the destructor will keep the memory
536          * and leak it for now.
537          */
538         pthreadpool_tevent_job_orphan(state->job);
539         state->job = NULL; /* not needed but looks better */
540         return;
541 }
542
543 struct tevent_req *pthreadpool_tevent_job_send(
544         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
545         struct pthreadpool_tevent *pool,
546         void (*fn)(void *private_data), void *private_data)
547 {
548         struct tevent_req *req = NULL;
549         struct pthreadpool_tevent_job_state *state = NULL;
550         struct pthreadpool_tevent_job *job = NULL;
551         int ret;
552
553         pthreadpool_tevent_cleanup_orphaned_jobs();
554
555         req = tevent_req_create(mem_ctx, &state,
556                                 struct pthreadpool_tevent_job_state);
557         if (req == NULL) {
558                 return NULL;
559         }
560         state->ev = ev;
561         state->req = req;
562
563         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
564
565         if (pool == NULL) {
566                 tevent_req_error(req, EINVAL);
567                 return tevent_req_post(req, ev);
568         }
569         if (pool->pool == NULL) {
570                 tevent_req_error(req, EINVAL);
571                 return tevent_req_post(req, ev);
572         }
573
574         ret = pthreadpool_tevent_register_ev(pool, ev);
575         if (tevent_req_error(req, ret)) {
576                 return tevent_req_post(req, ev);
577         }
578
579         job = talloc_zero(state, struct pthreadpool_tevent_job);
580         if (tevent_req_nomem(job, req)) {
581                 return tevent_req_post(req, ev);
582         }
583         job->pool = pool;
584         job->fn = fn;
585         job->private_data = private_data;
586         job->im = tevent_create_immediate(state->job);
587         if (tevent_req_nomem(job->im, req)) {
588                 return tevent_req_post(req, ev);
589         }
590         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
591         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
592         DLIST_ADD_END(job->pool->jobs, job);
593         job->state = state;
594         state->job = job;
595
596         ret = pthreadpool_add_job(job->pool->pool, 0,
597                                   pthreadpool_tevent_job_fn,
598                                   job);
599         if (tevent_req_error(req, ret)) {
600                 return tevent_req_post(req, ev);
601         }
602
603         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
604         return req;
605 }
606
607 static __thread struct pthreadpool_tevent_job *current_job;
608
609 bool pthreadpool_tevent_current_job_canceled(void)
610 {
611         if (current_job == NULL) {
612                 /*
613                  * Should only be called from within
614                  * the job function.
615                  */
616                 abort();
617                 return false;
618         }
619
620         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
621         return current_job->needs_fence.maycancel;
622 }
623
624 bool pthreadpool_tevent_current_job_orphaned(void)
625 {
626         if (current_job == NULL) {
627                 /*
628                  * Should only be called from within
629                  * the job function.
630                  */
631                 abort();
632                 return false;
633         }
634
635         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
636         return current_job->needs_fence.orphaned;
637 }
638
639 bool pthreadpool_tevent_current_job_continue(void)
640 {
641         if (current_job == NULL) {
642                 /*
643                  * Should only be called from within
644                  * the job function.
645                  */
646                 abort();
647                 return false;
648         }
649
650         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
651         if (current_job->needs_fence.maycancel) {
652                 return false;
653         }
654         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
655         if (current_job->needs_fence.orphaned) {
656                 return false;
657         }
658
659         return true;
660 }
661
662 static void pthreadpool_tevent_job_fn(void *private_data)
663 {
664         struct pthreadpool_tevent_job *job =
665                 talloc_get_type_abort(private_data,
666                 struct pthreadpool_tevent_job);
667
668         current_job = job;
669         job->needs_fence.started = true;
670         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
671
672         job->fn(job->private_data);
673
674         job->needs_fence.executed = true;
675         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
676         current_job = NULL;
677 }
678
679 static int pthreadpool_tevent_job_signal(int jobid,
680                                          void (*job_fn)(void *private_data),
681                                          void *job_private_data,
682                                          void *private_data)
683 {
684         struct pthreadpool_tevent_job *job =
685                 talloc_get_type_abort(job_private_data,
686                 struct pthreadpool_tevent_job);
687         struct pthreadpool_tevent_job_state *state = job->state;
688         struct tevent_threaded_context *tctx = NULL;
689         struct pthreadpool_tevent_glue *g = NULL;
690
691         job->needs_fence.finished = true;
692         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
693         if (job->needs_fence.orphaned) {
694                 /* Request already gone */
695                 job->needs_fence.dropped = true;
696                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
697                 return 0;
698         }
699
700 #ifdef HAVE_PTHREAD
701         for (g = job->pool->glue_list; g != NULL; g = g->next) {
702                 if (g->ev == state->ev) {
703                         tctx = g->tctx;
704                         break;
705                 }
706         }
707
708         if (tctx == NULL) {
709                 abort();
710         }
711 #endif
712
713         if (tctx != NULL) {
714                 /* with HAVE_PTHREAD */
715                 tevent_threaded_schedule_immediate(tctx, job->im,
716                                                    pthreadpool_tevent_job_done,
717                                                    job);
718         } else {
719                 /* without HAVE_PTHREAD */
720                 tevent_schedule_immediate(job->im, state->ev,
721                                           pthreadpool_tevent_job_done,
722                                           job);
723         }
724
725         job->needs_fence.signaled = true;
726         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
727         return 0;
728 }
729
730 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
731                                         struct tevent_immediate *im,
732                                         void *private_data)
733 {
734         struct pthreadpool_tevent_job *job =
735                 talloc_get_type_abort(private_data,
736                 struct pthreadpool_tevent_job);
737         struct pthreadpool_tevent_job_state *state = job->state;
738
739         TALLOC_FREE(job->im);
740
741         if (state == NULL) {
742                 /* Request already gone */
743                 TALLOC_FREE(job);
744                 return;
745         }
746
747         /*
748          * pthreadpool_tevent_job_cleanup()
749          * (called by tevent_req_done() or
750          * tevent_req_error()) will destroy the job.
751          */
752
753         if (job->needs_fence.executed) {
754                 tevent_req_done(state->req);
755                 return;
756         }
757
758         tevent_req_error(state->req, ENOEXEC);
759         return;
760 }
761
762 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
763 {
764         struct pthreadpool_tevent_job_state *state =
765                 tevent_req_data(req,
766                 struct pthreadpool_tevent_job_state);
767         struct pthreadpool_tevent_job *job = state->job;
768         size_t num;
769
770         if (job == NULL) {
771                 return false;
772         }
773
774         job->needs_fence.maycancel = true;
775         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
776         if (job->needs_fence.started) {
777                 /*
778                  * It was too late to cancel the request.
779                  *
780                  * The job still has the chance to look
781                  * at pthreadpool_tevent_current_job_canceled()
782                  * or pthreadpool_tevent_current_job_continue()
783                  */
784                 return false;
785         }
786
787         num = pthreadpool_cancel_job(job->pool->pool, 0,
788                                      pthreadpool_tevent_job_fn,
789                                      job);
790         if (num == 0) {
791                 /*
792                  * It was too late to cancel the request.
793                  */
794                 return false;
795         }
796
797         /*
798          * It was not too late to cancel the request.
799          *
800          * We can remove job->im, as it will never be used.
801          */
802         TALLOC_FREE(job->im);
803
804         /*
805          * pthreadpool_tevent_job_cleanup()
806          * will destroy the job.
807          */
808         tevent_req_defer_callback(req, state->ev);
809         tevent_req_error(req, ECANCELED);
810         return true;
811 }
812
813 int pthreadpool_tevent_job_recv(struct tevent_req *req)
814 {
815         return tevent_req_simple_recv_unix(req);
816 }