19b1e6d9650a467a1b933dafba71b7064efeac63
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "pthreadpool_tevent.h"
24 #include "pthreadpool.h"
25 #include "lib/util/tevent_unix.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/util/attr.h"
28
29 /*
30  * We try to give some hints to helgrind/drd
31  *
32  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
33  * takes an memory address range that ignored by helgrind/drd
34  * 'description' is just ignored...
35  *
36  *
37  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
38  * just takes a DWORD/(void *) as unique key
39  * for the barrier.
40  */
41 #ifdef HAVE_VALGRIND_HELGRIND_H
42 #include <valgrind/helgrind.h>
43 #endif
44 #ifndef ANNOTATE_BENIGN_RACE_SIZED
45 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
46 #endif
47 #ifndef ANNOTATE_HAPPENS_BEFORE
48 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
49 #endif
50 #ifndef ANNOTATE_HAPPENS_AFTER
51 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
52 #endif
53 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
54 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
55 #endif
56
57 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
58         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
59         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
60                                    sizeof(__j->needs_fence), \
61                                    "race by design, protected by fence"); \
62 } while(0);
63
64 #ifdef WITH_PTHREADPOOL
65 /*
66  * configure checked we have pthread and atomic_thread_fence() available
67  */
68 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
69         atomic_thread_fence(__order); \
70 } while(0)
71 #else
72 /*
73  * we're using lib/pthreadpool/pthreadpool_sync.c ...
74  */
75 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
76 #ifndef HAVE___THREAD
77 #define __thread
78 #endif
79 #endif
80
81 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
82         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
83         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
84         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
85         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
86 } while(0);
87
88 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
89         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
90         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
91 } while(0);
92
93 struct pthreadpool_tevent_job_state;
94
95 /*
96  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
97  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
98  * contexts in a pthreadpool_tevent.
99  */
100 struct pthreadpool_tevent_glue {
101         struct pthreadpool_tevent_glue *prev, *next;
102         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
103         /* Tuple we are keeping track of in this list. */
104         struct tevent_context *ev;
105         struct tevent_threaded_context *tctx;
106         /* recheck monitor fd event */
107         struct tevent_fd *fde;
108         /* Pointer to link object owned by *ev. */
109         struct pthreadpool_tevent_glue_ev_link *ev_link;
110         /* active jobs */
111         struct pthreadpool_tevent_job_state *states;
112 };
113
114 /*
115  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
116  * tevent context from our list of active event contexts if the event context
117  * is destroyed.
118  * This structure is talloc()'ed from the struct tevent_context *, and is a
119  * back-pointer allowing the related struct pthreadpool_tevent_glue object
120  * to be removed from the struct pthreadpool_tevent glue list if the owning
121  * tevent_context is talloc_free()'ed.
122  */
123 struct pthreadpool_tevent_glue_ev_link {
124         struct pthreadpool_tevent_glue *glue;
125 };
126
127 struct pthreadpool_tevent_wrapper {
128         struct pthreadpool_tevent *main_tp;
129         struct pthreadpool_tevent *wrap_tp;
130         const struct pthreadpool_tevent_wrapper_ops *ops;
131         void *private_state;
132         bool force_per_thread_cwd;
133 };
134
135 struct pthreadpool_tevent {
136         struct pthreadpool_tevent *prev, *next;
137
138         struct pthreadpool *pool;
139         struct pthreadpool_tevent_glue *glue_list;
140
141         struct pthreadpool_tevent_job *jobs;
142
143         struct {
144                 /*
145                  * This is used on the main context
146                  */
147                 struct pthreadpool_tevent *list;
148
149                 /*
150                  * This is used on the wrapper context
151                  */
152                 struct pthreadpool_tevent_wrapper *ctx;
153         } wrapper;
154 };
155
156 struct pthreadpool_tevent_job_state {
157         struct pthreadpool_tevent_job_state *prev, *next;
158         struct pthreadpool_tevent_glue *glue;
159         struct tevent_context *ev;
160         struct tevent_req *req;
161         struct pthreadpool_tevent_job *job;
162 };
163
164 struct pthreadpool_tevent_job {
165         struct pthreadpool_tevent_job *prev, *next;
166
167         struct pthreadpool_tevent *pool;
168         struct pthreadpool_tevent_wrapper *wrapper;
169         struct pthreadpool_tevent_job_state *state;
170         struct tevent_immediate *im;
171
172         void (*fn)(void *private_data);
173         void *private_data;
174
175         /*
176          * Coordination between threads
177          *
178          * There're only one side writing each element
179          * either the main process or the job thread.
180          *
181          * The coordination is done by a full memory
182          * barrier using atomic_thread_fence(memory_order_seq_cst)
183          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
184          */
185         struct {
186                 /*
187                  * 'maycancel'
188                  * set when tevent_req_cancel() is called.
189                  * (only written by main thread!)
190                  */
191                 bool maycancel;
192
193                 /*
194                  * 'orphaned'
195                  * set when talloc_free is called on the job request,
196                  * tevent_context or pthreadpool_tevent.
197                  * (only written by main thread!)
198                  */
199                 bool orphaned;
200
201                 /*
202                  * 'started'
203                  * set when the job is picked up by a worker thread
204                  * (only written by job thread!)
205                  */
206                 bool started;
207
208                 /*
209                  * 'wrapper'
210                  * set before calling the wrapper before_job() or
211                  * after_job() hooks.
212                  * unset again check the hook finished.
213                  * (only written by job thread!)
214                  */
215                 bool wrapper;
216
217                 /*
218                  * 'executed'
219                  * set once the job function returned.
220                  * (only written by job thread!)
221                  */
222                 bool executed;
223
224                 /*
225                  * 'finished'
226                  * set when pthreadpool_tevent_job_signal() is entered
227                  * (only written by job thread!)
228                  */
229                 bool finished;
230
231                 /*
232                  * 'dropped'
233                  * set when pthreadpool_tevent_job_signal() leaves with
234                  * orphaned already set.
235                  * (only written by job thread!)
236                  */
237                 bool dropped;
238
239                 /*
240                  * 'signaled'
241                  * set when pthreadpool_tevent_job_signal() leaves normal
242                  * and the immediate event was scheduled.
243                  * (only written by job thread!)
244                  */
245                 bool signaled;
246
247                 /*
248                  * 'exit_thread'
249                  * maybe set during pthreadpool_tevent_job_fn()
250                  * if some wrapper related code generated an error
251                  * and the environment isn't safe anymore.
252                  *
253                  * In such a case pthreadpool_tevent_job_signal()
254                  * will pick this up and therminate the current
255                  * worker thread by returning -1.
256                  */
257                 bool exit_thread; /* only written/read by job thread! */
258         } needs_fence;
259
260         bool per_thread_cwd;
261 };
262
263 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
264
265 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
266
267 static struct pthreadpool_tevent_job *orphaned_jobs;
268
269 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
270 {
271         struct pthreadpool_tevent_job *job = NULL;
272         struct pthreadpool_tevent_job *njob = NULL;
273
274         for (job = orphaned_jobs; job != NULL; job = njob) {
275                 njob = job->next;
276
277                 /*
278                  * The job destructor keeps the job alive
279                  * (and in the list) or removes it from the list.
280                  */
281                 TALLOC_FREE(job);
282         }
283 }
284
285 static int pthreadpool_tevent_job_signal(int jobid,
286                                          void (*job_fn)(void *private_data),
287                                          void *job_private_data,
288                                          void *private_data);
289
290 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
291                             struct pthreadpool_tevent **presult)
292 {
293         struct pthreadpool_tevent *pool;
294         int ret;
295
296         pthreadpool_tevent_cleanup_orphaned_jobs();
297
298         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
299         if (pool == NULL) {
300                 return ENOMEM;
301         }
302
303         ret = pthreadpool_init(max_threads, &pool->pool,
304                                pthreadpool_tevent_job_signal, pool);
305         if (ret != 0) {
306                 TALLOC_FREE(pool);
307                 return ret;
308         }
309
310         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
311
312         *presult = pool;
313         return 0;
314 }
315
316 static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
317         struct pthreadpool_tevent *pool)
318 {
319         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
320
321         if (wrapper != NULL) {
322                 return wrapper->main_tp;
323         }
324
325         return pool;
326 }
327
328 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
329 {
330         pool = pthreadpool_tevent_unwrap(pool);
331
332         if (pool->pool == NULL) {
333                 return 0;
334         }
335
336         return pthreadpool_max_threads(pool->pool);
337 }
338
339 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
340 {
341         pool = pthreadpool_tevent_unwrap(pool);
342
343         if (pool->pool == NULL) {
344                 return 0;
345         }
346
347         return pthreadpool_queued_jobs(pool->pool);
348 }
349
350 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
351 {
352         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
353
354         if (wrapper != NULL && wrapper->force_per_thread_cwd) {
355                 return true;
356         }
357
358         pool = pthreadpool_tevent_unwrap(pool);
359
360         if (pool->pool == NULL) {
361                 return false;
362         }
363
364         return pthreadpool_per_thread_cwd(pool->pool);
365 }
366
367 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
368 {
369         struct pthreadpool_tevent_job *job = NULL;
370         struct pthreadpool_tevent_job *njob = NULL;
371         struct pthreadpool_tevent *wrap_tp = NULL;
372         struct pthreadpool_tevent *nwrap_tp = NULL;
373         struct pthreadpool_tevent_glue *glue = NULL;
374         int ret;
375
376         if (pool->wrapper.ctx != NULL) {
377                 struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
378
379                 pool->wrapper.ctx = NULL;
380                 pool = wrapper->main_tp;
381
382                 DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
383
384                 for (job = pool->jobs; job != NULL; job = njob) {
385                         njob = job->next;
386
387                         if (job->wrapper != wrapper) {
388                                 continue;
389                         }
390
391                         /*
392                          * This removes the job from the list
393                          *
394                          * Note that it waits in case
395                          * the wrapper hooks are currently
396                          * executing on the job.
397                          */
398                         pthreadpool_tevent_job_orphan(job);
399                 }
400
401                 /*
402                  * At this point we're sure that no job
403                  * still references the pthreadpool_tevent_wrapper
404                  * structure, so we can free it.
405                  */
406                 TALLOC_FREE(wrapper);
407
408                 pthreadpool_tevent_cleanup_orphaned_jobs();
409                 return 0;
410         }
411
412         if (pool->pool == NULL) {
413                 /*
414                  * A dangling wrapper without main_tp.
415                  */
416                 return 0;
417         }
418
419         ret = pthreadpool_stop(pool->pool);
420         if (ret != 0) {
421                 return ret;
422         }
423
424         /*
425          * orphan all jobs (including wrapper jobs)
426          */
427         for (job = pool->jobs; job != NULL; job = njob) {
428                 njob = job->next;
429
430                 /*
431                  * The job this removes it from the list
432                  *
433                  * Note that it waits in case
434                  * the wrapper hooks are currently
435                  * executing on the job (thread).
436                  */
437                 pthreadpool_tevent_job_orphan(job);
438         }
439
440         /*
441          * cleanup all existing wrappers, remember we just orphaned
442          * all jobs (including the once of the wrappers).
443          *
444          * So we just mark as broken, so that
445          * pthreadpool_tevent_job_send() won't accept new jobs.
446          */
447         for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
448                 nwrap_tp = wrap_tp->next;
449
450                 /*
451                  * Just mark them as broken, so that we can't
452                  * get more jobs.
453                  */
454                 TALLOC_FREE(wrap_tp->wrapper.ctx);
455
456                 DLIST_REMOVE(pool->wrapper.list, wrap_tp);
457         }
458
459         /*
460          * Delete all the registered
461          * tevent_context/tevent_threaded_context
462          * pairs.
463          */
464         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
465                 /* The glue destructor removes it from the list */
466                 TALLOC_FREE(glue);
467         }
468         pool->glue_list = NULL;
469
470         ret = pthreadpool_destroy(pool->pool);
471         if (ret != 0) {
472                 return ret;
473         }
474         pool->pool = NULL;
475
476         pthreadpool_tevent_cleanup_orphaned_jobs();
477
478         return 0;
479 }
480
481 struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
482                                 struct pthreadpool_tevent *main_tp,
483                                 TALLOC_CTX *mem_ctx,
484                                 const struct pthreadpool_tevent_wrapper_ops *ops,
485                                 void *pstate,
486                                 size_t psize,
487                                 const char *type,
488                                 const char *location)
489 {
490         void **ppstate = (void **)pstate;
491         struct pthreadpool_tevent *wrap_tp = NULL;
492         struct pthreadpool_tevent_wrapper *wrapper = NULL;
493
494         pthreadpool_tevent_cleanup_orphaned_jobs();
495
496         if (main_tp->wrapper.ctx != NULL) {
497                 /*
498                  * stacking of wrappers is not supported
499                  */
500                 errno = EINVAL;
501                 return NULL;
502         }
503
504         if (main_tp->pool == NULL) {
505                 /*
506                  * The pool is no longer valid,
507                  * most likely it was a wrapper context
508                  * where the main pool was destroyed.
509                  */
510                 errno = EINVAL;
511                 return NULL;
512         }
513
514         wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
515         if (wrap_tp == NULL) {
516                 return NULL;
517         }
518
519         wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
520         if (wrapper == NULL) {
521                 TALLOC_FREE(wrap_tp);
522                 return NULL;
523         }
524         wrapper->main_tp = main_tp;
525         wrapper->wrap_tp = wrap_tp;
526         wrapper->ops = ops;
527         wrapper->private_state = talloc_zero_size(wrapper, psize);
528         if (wrapper->private_state == NULL) {
529                 TALLOC_FREE(wrap_tp);
530                 return NULL;
531         }
532         talloc_set_name_const(wrapper->private_state, type);
533
534         wrap_tp->wrapper.ctx = wrapper;
535
536         DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
537
538         talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
539
540         *ppstate = wrapper->private_state;
541         return wrap_tp;
542 }
543
544 void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
545                                              const void *private_state)
546 {
547         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
548
549         if (wrapper == NULL) {
550                 abort();
551         }
552
553         if (wrapper->private_state != private_state) {
554                 abort();
555         }
556
557         wrapper->force_per_thread_cwd = true;
558 }
559
560 static int pthreadpool_tevent_glue_destructor(
561         struct pthreadpool_tevent_glue *glue)
562 {
563         struct pthreadpool_tevent_job_state *state = NULL;
564         struct pthreadpool_tevent_job_state *nstate = NULL;
565
566         TALLOC_FREE(glue->fde);
567
568         for (state = glue->states; state != NULL; state = nstate) {
569                 nstate = state->next;
570
571                 /* The job this removes it from the list */
572                 pthreadpool_tevent_job_orphan(state->job);
573         }
574
575         if (glue->pool->glue_list != NULL) {
576                 DLIST_REMOVE(glue->pool->glue_list, glue);
577         }
578
579         /* Ensure the ev_link destructor knows we're gone */
580         glue->ev_link->glue = NULL;
581
582         TALLOC_FREE(glue->ev_link);
583         TALLOC_FREE(glue->tctx);
584
585         return 0;
586 }
587
588 /*
589  * Destructor called either explicitly from
590  * pthreadpool_tevent_glue_destructor(), or indirectly
591  * when owning tevent_context is destroyed.
592  *
593  * When called from pthreadpool_tevent_glue_destructor()
594  * ev_link->glue is already NULL, so this does nothing.
595  *
596  * When called from talloc_free() of the owning
597  * tevent_context we must ensure we also remove the
598  * linked glue object from the list inside
599  * struct pthreadpool_tevent.
600  */
601 static int pthreadpool_tevent_glue_link_destructor(
602         struct pthreadpool_tevent_glue_ev_link *ev_link)
603 {
604         TALLOC_FREE(ev_link->glue);
605         return 0;
606 }
607
608 static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
609                                             struct tevent_fd *fde,
610                                             uint16_t flags,
611                                             void *private_data)
612 {
613         struct pthreadpool_tevent_glue *glue =
614                 talloc_get_type_abort(private_data,
615                 struct pthreadpool_tevent_glue);
616         struct pthreadpool_tevent_job *job = NULL;
617         struct pthreadpool_tevent_job *njob = NULL;
618         int ret = -1;
619
620         ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
621         if (ret != 0) {
622                 TALLOC_FREE(glue->fde);
623         }
624
625         ret = pthreadpool_restart_check(glue->pool->pool);
626         if (ret == 0) {
627                 /*
628                  * success...
629                  */
630                 goto done;
631         }
632
633         /*
634          * There's a problem and the pool
635          * has not a single thread available
636          * for pending jobs, so we can only
637          * stop the jobs and return an error.
638          * This is similar to a failure from
639          * pthreadpool_add_job().
640          */
641         for (job = glue->pool->jobs; job != NULL; job = njob) {
642                 njob = job->next;
643
644                 tevent_req_defer_callback(job->state->req,
645                                           job->state->ev);
646                 tevent_req_error(job->state->req, ret);
647         }
648
649 done:
650         if (glue->states == NULL) {
651                 /*
652                  * If the glue doesn't have any pending jobs
653                  * we remove the glue.
654                  *
655                  * In order to remove the fd event.
656                  */
657                 TALLOC_FREE(glue);
658         }
659 }
660
661 static int pthreadpool_tevent_register_ev(
662                                 struct pthreadpool_tevent *pool,
663                                 struct pthreadpool_tevent_job_state *state)
664 {
665         struct tevent_context *ev = state->ev;
666         struct pthreadpool_tevent_glue *glue = NULL;
667         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
668         int monitor_fd = -1;
669
670         /*
671          * See if this tevent_context was already registered by
672          * searching the glue object list. If so we have nothing
673          * to do here - we already have a tevent_context/tevent_threaded_context
674          * pair.
675          */
676         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
677                 if (glue->ev == state->ev) {
678                         state->glue = glue;
679                         DLIST_ADD_END(glue->states, state);
680                         return 0;
681                 }
682         }
683
684         /*
685          * Event context not yet registered - create a new glue
686          * object containing a tevent_context/tevent_threaded_context
687          * pair and put it on the list to remember this registration.
688          * We also need a link object to ensure the event context
689          * can't go away without us knowing about it.
690          */
691         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
692         if (glue == NULL) {
693                 return ENOMEM;
694         }
695         *glue = (struct pthreadpool_tevent_glue) {
696                 .pool = pool,
697                 .ev = ev,
698         };
699         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
700
701         monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
702         if (monitor_fd == -1 && errno != ENOSYS) {
703                 int saved_errno = errno;
704                 TALLOC_FREE(glue);
705                 return saved_errno;
706         }
707
708         if (monitor_fd != -1) {
709                 glue->fde = tevent_add_fd(ev,
710                                           glue,
711                                           monitor_fd,
712                                           TEVENT_FD_READ,
713                                           pthreadpool_tevent_glue_monitor,
714                                           glue);
715                 if (glue->fde == NULL) {
716                         close(monitor_fd);
717                         TALLOC_FREE(glue);
718                         return ENOMEM;
719                 }
720                 tevent_fd_set_auto_close(glue->fde);
721         }
722
723         /*
724          * Now allocate the link object to the event context. Note this
725          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
726          * context is freed we are able to cleanup the glue object
727          * in the link object destructor.
728          */
729
730         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
731         if (ev_link == NULL) {
732                 TALLOC_FREE(glue);
733                 return ENOMEM;
734         }
735         ev_link->glue = glue;
736         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
737
738         glue->ev_link = ev_link;
739
740 #ifdef HAVE_PTHREAD
741         glue->tctx = tevent_threaded_context_create(glue, ev);
742         if (glue->tctx == NULL) {
743                 TALLOC_FREE(ev_link);
744                 TALLOC_FREE(glue);
745                 return ENOMEM;
746         }
747 #endif
748
749         state->glue = glue;
750         DLIST_ADD_END(glue->states, state);
751
752         DLIST_ADD(pool->glue_list, glue);
753         return 0;
754 }
755
756 static void pthreadpool_tevent_job_fn(void *private_data);
757 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
758                                         struct tevent_immediate *im,
759                                         void *private_data);
760 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
761
762 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
763 {
764         /*
765          * We should never be called with needs_fence.orphaned == false.
766          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
767          * after detaching from the request state, glue and pool list.
768          */
769         if (!job->needs_fence.orphaned) {
770                 abort();
771         }
772
773         /*
774          * If the job is not finished (job->im still there)
775          * and it's still attached to the pool,
776          * we try to cancel it (before it was starts)
777          */
778         if (job->im != NULL && job->pool != NULL) {
779                 size_t num;
780
781                 num = pthreadpool_cancel_job(job->pool->pool, 0,
782                                              pthreadpool_tevent_job_fn,
783                                              job);
784                 if (num != 0) {
785                         /*
786                          * It was not too late to cancel the request.
787                          *
788                          * We can remove job->im, as it will never be used.
789                          */
790                         TALLOC_FREE(job->im);
791                 }
792         }
793
794         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
795         if (job->needs_fence.dropped) {
796                 /*
797                  * The signal function saw job->needs_fence.orphaned
798                  * before it started the signaling via the immediate
799                  * event. So we'll never geht triggered and can
800                  * remove job->im and let the whole job go...
801                  */
802                 TALLOC_FREE(job->im);
803         }
804
805         /*
806          * TODO?: We could further improve this by adjusting
807          * tevent_threaded_schedule_immediate_destructor()
808          * and allow TALLOC_FREE() during its time
809          * in the main_ev->scheduled_immediates list.
810          *
811          * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
812          * if (state->needs_fence.signaled) {
813          *       *
814          *       * The signal function is completed
815          *       * in future we may be allowed
816          *       * to call TALLOC_FREE(job->im).
817          *       *
818          *      TALLOC_FREE(job->im);
819          * }
820          */
821
822         /*
823          * pthreadpool_tevent_job_orphan() already removed
824          * it from pool->jobs. And we don't need try
825          * pthreadpool_cancel_job() again.
826          */
827         job->pool = NULL;
828
829         if (job->im != NULL) {
830                 /*
831                  * state->im still there means, we need to wait for the
832                  * immediate event to be triggered or just leak the memory.
833                  *
834                  * Move it to the orphaned list, if it's not already there.
835                  */
836                 return -1;
837         }
838
839         /*
840          * Finally remove from the orphaned_jobs list
841          * and let talloc destroy us.
842          */
843         DLIST_REMOVE(orphaned_jobs, job);
844
845         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
846         return 0;
847 }
848
849 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
850 {
851         job->needs_fence.orphaned = true;
852         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
853
854         /*
855          * We're the only function that sets
856          * job->state = NULL;
857          */
858         if (job->state == NULL) {
859                 abort();
860         }
861
862         /*
863          * Once we marked the request as 'orphaned'
864          * we spin/loop if 'wrapper' is marked as active.
865          *
866          * We need to wait until the wrapper hook finished
867          * before we can set job->wrapper = NULL.
868          *
869          * This is some kind of spinlock, but with
870          * 1 millisecond sleeps in between, in order
871          * to give the thread more cpu time to finish.
872          */
873         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
874         while (job->needs_fence.wrapper) {
875                 poll(NULL, 0, 1);
876                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
877         }
878         job->wrapper = NULL;
879
880         /*
881          * Once we marked the request as 'orphaned'
882          * we spin/loop if it's already marked
883          * as 'finished' (which means that
884          * pthreadpool_tevent_job_signal() was entered.
885          * If it saw 'orphaned' it will exit after setting
886          * 'dropped', otherwise it dereferences
887          * job->state->glue->{tctx,ev} until it exited
888          * after setting 'signaled'.
889          *
890          * We need to close this potential gab before
891          * we can set job->state = NULL.
892          *
893          * This is some kind of spinlock, but with
894          * 1 millisecond sleeps in between, in order
895          * to give the thread more cpu time to finish.
896          */
897         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
898         while (job->needs_fence.finished) {
899                 if (job->needs_fence.dropped) {
900                         break;
901                 }
902                 if (job->needs_fence.signaled) {
903                         break;
904                 }
905                 poll(NULL, 0, 1);
906                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
907         }
908
909         /*
910          * Once the gab is closed, we can remove
911          * the glue link.
912          */
913         DLIST_REMOVE(job->state->glue->states, job->state);
914         job->state->glue = NULL;
915
916         /*
917          * We need to reparent to a long term context.
918          * And detach from the request state.
919          * Maybe the destructor will keep the memory
920          * and leak it for now.
921          */
922         (void)talloc_reparent(job->state, NULL, job);
923         job->state->job = NULL;
924         job->state = NULL;
925
926         /*
927          * job->pool will only be set to NULL
928          * in the first destructur run.
929          */
930         if (job->pool == NULL) {
931                 abort();
932         }
933
934         /*
935          * Dettach it from the pool.
936          *
937          * The job might still be running,
938          * so we keep job->pool.
939          * The destructor will set it to NULL
940          * after trying pthreadpool_cancel_job()
941          */
942         DLIST_REMOVE(job->pool->jobs, job);
943
944         /*
945          * Add it to the list of orphaned jobs,
946          * which may be cleaned up later.
947          *
948          * The destructor removes it from the list
949          * when possible or it denies the free
950          * and keep it in the list.
951          */
952         DLIST_ADD_END(orphaned_jobs, job);
953         TALLOC_FREE(job);
954 }
955
956 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
957                                            enum tevent_req_state req_state)
958 {
959         struct pthreadpool_tevent_job_state *state =
960                 tevent_req_data(req,
961                 struct pthreadpool_tevent_job_state);
962
963         if (state->job == NULL) {
964                 /*
965                  * The job request is not scheduled in the pool
966                  * yet or anymore.
967                  */
968                 if (state->glue != NULL) {
969                         DLIST_REMOVE(state->glue->states, state);
970                         state->glue = NULL;
971                 }
972                 return;
973         }
974
975         /*
976          * We need to reparent to a long term context.
977          * Maybe the destructor will keep the memory
978          * and leak it for now.
979          */
980         pthreadpool_tevent_job_orphan(state->job);
981         state->job = NULL; /* not needed but looks better */
982         return;
983 }
984
985 struct tevent_req *pthreadpool_tevent_job_send(
986         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
987         struct pthreadpool_tevent *pool,
988         void (*fn)(void *private_data), void *private_data)
989 {
990         struct tevent_req *req = NULL;
991         struct pthreadpool_tevent_job_state *state = NULL;
992         struct pthreadpool_tevent_job *job = NULL;
993         int ret;
994         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
995
996         pthreadpool_tevent_cleanup_orphaned_jobs();
997
998         if (wrapper != NULL) {
999                 pool = wrapper->main_tp;
1000         }
1001
1002         req = tevent_req_create(mem_ctx, &state,
1003                                 struct pthreadpool_tevent_job_state);
1004         if (req == NULL) {
1005                 return NULL;
1006         }
1007         state->ev = ev;
1008         state->req = req;
1009
1010         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
1011
1012         if (pool == NULL) {
1013                 tevent_req_error(req, EINVAL);
1014                 return tevent_req_post(req, ev);
1015         }
1016         if (pool->pool == NULL) {
1017                 tevent_req_error(req, EINVAL);
1018                 return tevent_req_post(req, ev);
1019         }
1020
1021         ret = pthreadpool_tevent_register_ev(pool, state);
1022         if (tevent_req_error(req, ret)) {
1023                 return tevent_req_post(req, ev);
1024         }
1025
1026         job = talloc_zero(state, struct pthreadpool_tevent_job);
1027         if (tevent_req_nomem(job, req)) {
1028                 return tevent_req_post(req, ev);
1029         }
1030         job->pool = pool;
1031         job->wrapper = wrapper;
1032         job->fn = fn;
1033         job->private_data = private_data;
1034         job->im = tevent_create_immediate(state->job);
1035         if (tevent_req_nomem(job->im, req)) {
1036                 return tevent_req_post(req, ev);
1037         }
1038         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
1039         job->per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
1040         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
1041         DLIST_ADD_END(job->pool->jobs, job);
1042         job->state = state;
1043         state->job = job;
1044
1045         ret = pthreadpool_add_job(job->pool->pool, 0,
1046                                   pthreadpool_tevent_job_fn,
1047                                   job);
1048         if (tevent_req_error(req, ret)) {
1049                 return tevent_req_post(req, ev);
1050         }
1051
1052         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
1053         return req;
1054 }
1055
1056 static __thread struct pthreadpool_tevent_job *current_job;
1057
1058 bool pthreadpool_tevent_current_job_canceled(void)
1059 {
1060         if (current_job == NULL) {
1061                 /*
1062                  * Should only be called from within
1063                  * the job function.
1064                  */
1065                 abort();
1066                 return false;
1067         }
1068
1069         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1070         return current_job->needs_fence.maycancel;
1071 }
1072
1073 bool pthreadpool_tevent_current_job_orphaned(void)
1074 {
1075         if (current_job == NULL) {
1076                 /*
1077                  * Should only be called from within
1078                  * the job function.
1079                  */
1080                 abort();
1081                 return false;
1082         }
1083
1084         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1085         return current_job->needs_fence.orphaned;
1086 }
1087
1088 bool pthreadpool_tevent_current_job_continue(void)
1089 {
1090         if (current_job == NULL) {
1091                 /*
1092                  * Should only be called from within
1093                  * the job function.
1094                  */
1095                 abort();
1096                 return false;
1097         }
1098
1099         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1100         if (current_job->needs_fence.maycancel) {
1101                 return false;
1102         }
1103         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1104         if (current_job->needs_fence.orphaned) {
1105                 return false;
1106         }
1107
1108         return true;
1109 }
1110
1111 bool pthreadpool_tevent_current_job_per_thread_cwd(void)
1112 {
1113         if (current_job == NULL) {
1114                 /*
1115                  * Should only be called from within
1116                  * the job function.
1117                  */
1118                 abort();
1119                 return false;
1120         }
1121
1122         return current_job->per_thread_cwd;
1123 }
1124
1125 static void pthreadpool_tevent_job_fn(void *private_data)
1126 {
1127         struct pthreadpool_tevent_job *job =
1128                 talloc_get_type_abort(private_data,
1129                 struct pthreadpool_tevent_job);
1130         struct pthreadpool_tevent_wrapper *wrapper = NULL;
1131
1132         current_job = job;
1133         job->needs_fence.started = true;
1134         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1135         if (job->needs_fence.orphaned) {
1136                 current_job = NULL;
1137                 return;
1138         }
1139
1140         wrapper = job->wrapper;
1141         if (wrapper != NULL) {
1142                 bool ok;
1143
1144                 job->needs_fence.wrapper = true;
1145                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1146                 if (job->needs_fence.orphaned) {
1147                         job->needs_fence.wrapper = false;
1148                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1149                         current_job = NULL;
1150                         return;
1151                 }
1152                 ok = wrapper->ops->before_job(wrapper->wrap_tp,
1153                                               wrapper->private_state,
1154                                               wrapper->main_tp,
1155                                               __location__);
1156                 job->needs_fence.wrapper = false;
1157                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1158                 if (!ok) {
1159                         job->needs_fence.exit_thread = true;
1160                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1161                         current_job = NULL;
1162                         return;
1163                 }
1164         }
1165
1166         job->fn(job->private_data);
1167
1168         job->needs_fence.executed = true;
1169         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1170
1171         if (wrapper != NULL) {
1172                 bool ok;
1173
1174                 job->needs_fence.wrapper = true;
1175                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1176                 if (job->needs_fence.orphaned) {
1177                         job->needs_fence.wrapper = false;
1178                         job->needs_fence.exit_thread = true;
1179                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1180                         current_job = NULL;
1181                         return;
1182                 }
1183                 ok = wrapper->ops->after_job(wrapper->wrap_tp,
1184                                              wrapper->private_state,
1185                                              wrapper->main_tp,
1186                                              __location__);
1187                 job->needs_fence.wrapper = false;
1188                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1189                 if (!ok) {
1190                         job->needs_fence.exit_thread = true;
1191                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1192                         current_job = NULL;
1193                         return;
1194                 }
1195         }
1196
1197         current_job = NULL;
1198 }
1199
1200 static int pthreadpool_tevent_job_signal(int jobid,
1201                                          void (*job_fn)(void *private_data),
1202                                          void *job_private_data,
1203                                          void *private_data)
1204 {
1205         struct pthreadpool_tevent_job *job =
1206                 talloc_get_type_abort(job_private_data,
1207                 struct pthreadpool_tevent_job);
1208
1209         job->needs_fence.finished = true;
1210         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1211         if (job->needs_fence.orphaned) {
1212                 /* Request already gone */
1213                 job->needs_fence.dropped = true;
1214                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1215                 if (job->needs_fence.exit_thread) {
1216                         /*
1217                          * A problem with the wrapper the current job/worker
1218                          * thread needs to terminate.
1219                          *
1220                          * The pthreadpool_tevent is already gone.
1221                          */
1222                         return -1;
1223                 }
1224                 return 0;
1225         }
1226
1227         /*
1228          * state and state->glue are valid,
1229          * see the job->needs_fence.finished
1230          * "spinlock" loop in
1231          * pthreadpool_tevent_job_orphan()
1232          */
1233         if (job->state->glue->tctx != NULL) {
1234                 /* with HAVE_PTHREAD */
1235                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
1236                                                    job->im,
1237                                                    pthreadpool_tevent_job_done,
1238                                                    job);
1239         } else {
1240                 /* without HAVE_PTHREAD */
1241                 tevent_schedule_immediate(job->im,
1242                                           job->state->glue->ev,
1243                                           pthreadpool_tevent_job_done,
1244                                           job);
1245         }
1246
1247         job->needs_fence.signaled = true;
1248         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1249         if (job->needs_fence.exit_thread) {
1250                 /*
1251                  * A problem with the wrapper the current job/worker
1252                  * thread needs to terminate.
1253                  *
1254                  * The pthreadpool_tevent is already gone.
1255                  */
1256                 return -1;
1257         }
1258         return 0;
1259 }
1260
1261 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
1262                                         struct tevent_immediate *im,
1263                                         void *private_data)
1264 {
1265         struct pthreadpool_tevent_job *job =
1266                 talloc_get_type_abort(private_data,
1267                 struct pthreadpool_tevent_job);
1268         struct pthreadpool_tevent_job_state *state = job->state;
1269
1270         TALLOC_FREE(job->im);
1271
1272         if (state == NULL) {
1273                 /* Request already gone */
1274                 TALLOC_FREE(job);
1275                 return;
1276         }
1277
1278         /*
1279          * pthreadpool_tevent_job_cleanup()
1280          * (called by tevent_req_done() or
1281          * tevent_req_error()) will destroy the job.
1282          */
1283
1284         if (job->needs_fence.executed) {
1285                 tevent_req_done(state->req);
1286                 return;
1287         }
1288
1289         tevent_req_error(state->req, ENOEXEC);
1290         return;
1291 }
1292
1293 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
1294 {
1295         struct pthreadpool_tevent_job_state *state =
1296                 tevent_req_data(req,
1297                 struct pthreadpool_tevent_job_state);
1298         struct pthreadpool_tevent_job *job = state->job;
1299         size_t num;
1300
1301         if (job == NULL) {
1302                 return false;
1303         }
1304
1305         job->needs_fence.maycancel = true;
1306         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1307         if (job->needs_fence.started) {
1308                 /*
1309                  * It was too late to cancel the request.
1310                  *
1311                  * The job still has the chance to look
1312                  * at pthreadpool_tevent_current_job_canceled()
1313                  * or pthreadpool_tevent_current_job_continue()
1314                  */
1315                 return false;
1316         }
1317
1318         num = pthreadpool_cancel_job(job->pool->pool, 0,
1319                                      pthreadpool_tevent_job_fn,
1320                                      job);
1321         if (num == 0) {
1322                 /*
1323                  * It was too late to cancel the request.
1324                  */
1325                 return false;
1326         }
1327
1328         /*
1329          * It was not too late to cancel the request.
1330          *
1331          * We can remove job->im, as it will never be used.
1332          */
1333         TALLOC_FREE(job->im);
1334
1335         /*
1336          * pthreadpool_tevent_job_cleanup()
1337          * will destroy the job.
1338          */
1339         tevent_req_defer_callback(req, state->ev);
1340         tevent_req_error(req, ECANCELED);
1341         return true;
1342 }
1343
1344 int pthreadpool_tevent_job_recv(struct tevent_req *req)
1345 {
1346         return tevent_req_simple_recv_unix(req);
1347 }