pthreadpool: we need to use pthreadpool_tevent_per_thread_cwd() on the callers pool
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "system/filesys.h"
24 #include "pthreadpool_tevent.h"
25 #include "pthreadpool.h"
26 #include "lib/util/tevent_unix.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/util/attr.h"
29
30 /*
31  * We try to give some hints to helgrind/drd
32  *
33  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
34  * takes an memory address range that ignored by helgrind/drd
35  * 'description' is just ignored...
36  *
37  *
38  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
39  * just takes a DWORD/(void *) as unique key
40  * for the barrier.
41  */
42 #ifdef HAVE_VALGRIND_HELGRIND_H
43 #include <valgrind/helgrind.h>
44 #endif
45 #ifndef ANNOTATE_BENIGN_RACE_SIZED
46 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
47 #endif
48 #ifndef ANNOTATE_HAPPENS_BEFORE
49 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
50 #endif
51 #ifndef ANNOTATE_HAPPENS_AFTER
52 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
53 #endif
54 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
55 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
56 #endif
57
58 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
59         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
60         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
61                                    sizeof(__j->needs_fence), \
62                                    "race by design, protected by fence"); \
63 } while(0);
64
65 #ifdef WITH_PTHREADPOOL
66 /*
67  * configure checked we have pthread and atomic_thread_fence() available
68  */
69 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
70         atomic_thread_fence(__order); \
71 } while(0)
72 #else
73 /*
74  * we're using lib/pthreadpool/pthreadpool_sync.c ...
75  */
76 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
77 #ifndef HAVE___THREAD
78 #define __thread
79 #endif
80 #endif
81
82 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
83         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
84         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
85         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
86         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
87 } while(0);
88
89 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
90         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
91         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
92 } while(0);
93
94 struct pthreadpool_tevent_job_state;
95
96 /*
97  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
98  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
99  * contexts in a pthreadpool_tevent.
100  */
101 struct pthreadpool_tevent_glue {
102         struct pthreadpool_tevent_glue *prev, *next;
103         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
104         /* Tuple we are keeping track of in this list. */
105         struct tevent_context *ev;
106         struct tevent_threaded_context *tctx;
107         /* recheck monitor fd event */
108         struct tevent_fd *fde;
109         /* Pointer to link object owned by *ev. */
110         struct pthreadpool_tevent_glue_ev_link *ev_link;
111         /* active jobs */
112         struct pthreadpool_tevent_job_state *states;
113 };
114
115 /*
116  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
117  * tevent context from our list of active event contexts if the event context
118  * is destroyed.
119  * This structure is talloc()'ed from the struct tevent_context *, and is a
120  * back-pointer allowing the related struct pthreadpool_tevent_glue object
121  * to be removed from the struct pthreadpool_tevent glue list if the owning
122  * tevent_context is talloc_free()'ed.
123  */
124 struct pthreadpool_tevent_glue_ev_link {
125         struct pthreadpool_tevent_glue *glue;
126 };
127
128 struct pthreadpool_tevent_wrapper {
129         struct pthreadpool_tevent *main_tp;
130         struct pthreadpool_tevent *wrap_tp;
131         const struct pthreadpool_tevent_wrapper_ops *ops;
132         void *private_state;
133         bool force_per_thread_cwd;
134 };
135
136 struct pthreadpool_tevent {
137         struct pthreadpool_tevent *prev, *next;
138
139         struct pthreadpool *pool;
140         struct pthreadpool_tevent_glue *glue_list;
141
142         struct pthreadpool_tevent_job *jobs;
143
144         struct {
145                 /*
146                  * This is used on the main context
147                  */
148                 struct pthreadpool_tevent *list;
149
150                 /*
151                  * This is used on the wrapper context
152                  */
153                 struct pthreadpool_tevent_wrapper *ctx;
154         } wrapper;
155 };
156
157 struct pthreadpool_tevent_job_state {
158         struct pthreadpool_tevent_job_state *prev, *next;
159         struct pthreadpool_tevent_glue *glue;
160         struct tevent_context *ev;
161         struct tevent_req *req;
162         struct pthreadpool_tevent_job *job;
163 };
164
165 struct pthreadpool_tevent_job {
166         struct pthreadpool_tevent_job *prev, *next;
167
168         struct pthreadpool_tevent *pool;
169         struct pthreadpool_tevent_wrapper *wrapper;
170         struct pthreadpool_tevent_job_state *state;
171         struct tevent_immediate *im;
172
173         void (*fn)(void *private_data);
174         void *private_data;
175
176         /*
177          * Coordination between threads
178          *
179          * There're only one side writing each element
180          * either the main process or the job thread.
181          *
182          * The coordination is done by a full memory
183          * barrier using atomic_thread_fence(memory_order_seq_cst)
184          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
185          */
186         struct {
187                 /*
188                  * 'maycancel'
189                  * set when tevent_req_cancel() is called.
190                  * (only written by main thread!)
191                  */
192                 bool maycancel;
193
194                 /*
195                  * 'orphaned'
196                  * set when talloc_free is called on the job request,
197                  * tevent_context or pthreadpool_tevent.
198                  * (only written by main thread!)
199                  */
200                 bool orphaned;
201
202                 /*
203                  * 'started'
204                  * set when the job is picked up by a worker thread
205                  * (only written by job thread!)
206                  */
207                 bool started;
208
209                 /*
210                  * 'wrapper'
211                  * set before calling the wrapper before_job() or
212                  * after_job() hooks.
213                  * unset again check the hook finished.
214                  * (only written by job thread!)
215                  */
216                 bool wrapper;
217
218                 /*
219                  * 'executed'
220                  * set once the job function returned.
221                  * (only written by job thread!)
222                  */
223                 bool executed;
224
225                 /*
226                  * 'finished'
227                  * set when pthreadpool_tevent_job_signal() is entered
228                  * (only written by job thread!)
229                  */
230                 bool finished;
231
232                 /*
233                  * 'dropped'
234                  * set when pthreadpool_tevent_job_signal() leaves with
235                  * orphaned already set.
236                  * (only written by job thread!)
237                  */
238                 bool dropped;
239
240                 /*
241                  * 'signaled'
242                  * set when pthreadpool_tevent_job_signal() leaves normal
243                  * and the immediate event was scheduled.
244                  * (only written by job thread!)
245                  */
246                 bool signaled;
247
248                 /*
249                  * 'exit_thread'
250                  * maybe set during pthreadpool_tevent_job_fn()
251                  * if some wrapper related code generated an error
252                  * and the environment isn't safe anymore.
253                  *
254                  * In such a case pthreadpool_tevent_job_signal()
255                  * will pick this up and therminate the current
256                  * worker thread by returning -1.
257                  */
258                 bool exit_thread; /* only written/read by job thread! */
259         } needs_fence;
260
261         bool per_thread_cwd;
262 };
263
264 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
265
266 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
267
268 static struct pthreadpool_tevent_job *orphaned_jobs;
269
270 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
271 {
272         struct pthreadpool_tevent_job *job = NULL;
273         struct pthreadpool_tevent_job *njob = NULL;
274
275         for (job = orphaned_jobs; job != NULL; job = njob) {
276                 njob = job->next;
277
278                 /*
279                  * The job destructor keeps the job alive
280                  * (and in the list) or removes it from the list.
281                  */
282                 TALLOC_FREE(job);
283         }
284 }
285
286 static int pthreadpool_tevent_job_signal(int jobid,
287                                          void (*job_fn)(void *private_data),
288                                          void *job_private_data,
289                                          void *private_data);
290
291 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
292                             struct pthreadpool_tevent **presult)
293 {
294         struct pthreadpool_tevent *pool;
295         int ret;
296
297         pthreadpool_tevent_cleanup_orphaned_jobs();
298
299         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
300         if (pool == NULL) {
301                 return ENOMEM;
302         }
303
304         ret = pthreadpool_init(max_threads, &pool->pool,
305                                pthreadpool_tevent_job_signal, pool);
306         if (ret != 0) {
307                 TALLOC_FREE(pool);
308                 return ret;
309         }
310
311         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
312
313         *presult = pool;
314         return 0;
315 }
316
317 static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
318         struct pthreadpool_tevent *pool)
319 {
320         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
321
322         if (wrapper != NULL) {
323                 return wrapper->main_tp;
324         }
325
326         return pool;
327 }
328
329 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
330 {
331         pool = pthreadpool_tevent_unwrap(pool);
332
333         if (pool->pool == NULL) {
334                 return 0;
335         }
336
337         return pthreadpool_max_threads(pool->pool);
338 }
339
340 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
341 {
342         pool = pthreadpool_tevent_unwrap(pool);
343
344         if (pool->pool == NULL) {
345                 return 0;
346         }
347
348         return pthreadpool_queued_jobs(pool->pool);
349 }
350
351 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
352 {
353         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
354
355         if (wrapper != NULL && wrapper->force_per_thread_cwd) {
356                 return true;
357         }
358
359         pool = pthreadpool_tevent_unwrap(pool);
360
361         if (pool->pool == NULL) {
362                 return false;
363         }
364
365         return pthreadpool_per_thread_cwd(pool->pool);
366 }
367
368 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
369 {
370         struct pthreadpool_tevent_job *job = NULL;
371         struct pthreadpool_tevent_job *njob = NULL;
372         struct pthreadpool_tevent *wrap_tp = NULL;
373         struct pthreadpool_tevent *nwrap_tp = NULL;
374         struct pthreadpool_tevent_glue *glue = NULL;
375         int ret;
376
377         if (pool->wrapper.ctx != NULL) {
378                 struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
379
380                 pool->wrapper.ctx = NULL;
381                 pool = wrapper->main_tp;
382
383                 DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
384
385                 for (job = pool->jobs; job != NULL; job = njob) {
386                         njob = job->next;
387
388                         if (job->wrapper != wrapper) {
389                                 continue;
390                         }
391
392                         /*
393                          * This removes the job from the list
394                          *
395                          * Note that it waits in case
396                          * the wrapper hooks are currently
397                          * executing on the job.
398                          */
399                         pthreadpool_tevent_job_orphan(job);
400                 }
401
402                 /*
403                  * At this point we're sure that no job
404                  * still references the pthreadpool_tevent_wrapper
405                  * structure, so we can free it.
406                  */
407                 TALLOC_FREE(wrapper);
408
409                 pthreadpool_tevent_cleanup_orphaned_jobs();
410                 return 0;
411         }
412
413         if (pool->pool == NULL) {
414                 /*
415                  * A dangling wrapper without main_tp.
416                  */
417                 return 0;
418         }
419
420         ret = pthreadpool_stop(pool->pool);
421         if (ret != 0) {
422                 return ret;
423         }
424
425         /*
426          * orphan all jobs (including wrapper jobs)
427          */
428         for (job = pool->jobs; job != NULL; job = njob) {
429                 njob = job->next;
430
431                 /*
432                  * The job this removes it from the list
433                  *
434                  * Note that it waits in case
435                  * the wrapper hooks are currently
436                  * executing on the job (thread).
437                  */
438                 pthreadpool_tevent_job_orphan(job);
439         }
440
441         /*
442          * cleanup all existing wrappers, remember we just orphaned
443          * all jobs (including the once of the wrappers).
444          *
445          * So we just mark as broken, so that
446          * pthreadpool_tevent_job_send() won't accept new jobs.
447          */
448         for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
449                 nwrap_tp = wrap_tp->next;
450
451                 /*
452                  * Just mark them as broken, so that we can't
453                  * get more jobs.
454                  */
455                 TALLOC_FREE(wrap_tp->wrapper.ctx);
456
457                 DLIST_REMOVE(pool->wrapper.list, wrap_tp);
458         }
459
460         /*
461          * Delete all the registered
462          * tevent_context/tevent_threaded_context
463          * pairs.
464          */
465         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
466                 /* The glue destructor removes it from the list */
467                 TALLOC_FREE(glue);
468         }
469         pool->glue_list = NULL;
470
471         ret = pthreadpool_destroy(pool->pool);
472         if (ret != 0) {
473                 return ret;
474         }
475         pool->pool = NULL;
476
477         pthreadpool_tevent_cleanup_orphaned_jobs();
478
479         return 0;
480 }
481
482 struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
483                                 struct pthreadpool_tevent *main_tp,
484                                 TALLOC_CTX *mem_ctx,
485                                 const struct pthreadpool_tevent_wrapper_ops *ops,
486                                 void *pstate,
487                                 size_t psize,
488                                 const char *type,
489                                 const char *location)
490 {
491         void **ppstate = (void **)pstate;
492         struct pthreadpool_tevent *wrap_tp = NULL;
493         struct pthreadpool_tevent_wrapper *wrapper = NULL;
494
495         pthreadpool_tevent_cleanup_orphaned_jobs();
496
497         if (main_tp->wrapper.ctx != NULL) {
498                 /*
499                  * stacking of wrappers is not supported
500                  */
501                 errno = EINVAL;
502                 return NULL;
503         }
504
505         if (main_tp->pool == NULL) {
506                 /*
507                  * The pool is no longer valid,
508                  * most likely it was a wrapper context
509                  * where the main pool was destroyed.
510                  */
511                 errno = EINVAL;
512                 return NULL;
513         }
514
515         wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
516         if (wrap_tp == NULL) {
517                 return NULL;
518         }
519
520         wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
521         if (wrapper == NULL) {
522                 TALLOC_FREE(wrap_tp);
523                 return NULL;
524         }
525         wrapper->main_tp = main_tp;
526         wrapper->wrap_tp = wrap_tp;
527         wrapper->ops = ops;
528         wrapper->private_state = talloc_zero_size(wrapper, psize);
529         if (wrapper->private_state == NULL) {
530                 TALLOC_FREE(wrap_tp);
531                 return NULL;
532         }
533         talloc_set_name_const(wrapper->private_state, type);
534
535         wrap_tp->wrapper.ctx = wrapper;
536
537         DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
538
539         talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
540
541         *ppstate = wrapper->private_state;
542         return wrap_tp;
543 }
544
545 void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
546                                              const void *private_state)
547 {
548         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
549
550         if (wrapper == NULL) {
551                 abort();
552         }
553
554         if (wrapper->private_state != private_state) {
555                 abort();
556         }
557
558         wrapper->force_per_thread_cwd = true;
559 }
560
561 static int pthreadpool_tevent_glue_destructor(
562         struct pthreadpool_tevent_glue *glue)
563 {
564         struct pthreadpool_tevent_job_state *state = NULL;
565         struct pthreadpool_tevent_job_state *nstate = NULL;
566
567         TALLOC_FREE(glue->fde);
568
569         for (state = glue->states; state != NULL; state = nstate) {
570                 nstate = state->next;
571
572                 /* The job this removes it from the list */
573                 pthreadpool_tevent_job_orphan(state->job);
574         }
575
576         if (glue->pool->glue_list != NULL) {
577                 DLIST_REMOVE(glue->pool->glue_list, glue);
578         }
579
580         /* Ensure the ev_link destructor knows we're gone */
581         glue->ev_link->glue = NULL;
582
583         TALLOC_FREE(glue->ev_link);
584         TALLOC_FREE(glue->tctx);
585
586         return 0;
587 }
588
589 /*
590  * Destructor called either explicitly from
591  * pthreadpool_tevent_glue_destructor(), or indirectly
592  * when owning tevent_context is destroyed.
593  *
594  * When called from pthreadpool_tevent_glue_destructor()
595  * ev_link->glue is already NULL, so this does nothing.
596  *
597  * When called from talloc_free() of the owning
598  * tevent_context we must ensure we also remove the
599  * linked glue object from the list inside
600  * struct pthreadpool_tevent.
601  */
602 static int pthreadpool_tevent_glue_link_destructor(
603         struct pthreadpool_tevent_glue_ev_link *ev_link)
604 {
605         TALLOC_FREE(ev_link->glue);
606         return 0;
607 }
608
609 static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
610                                             struct tevent_fd *fde,
611                                             uint16_t flags,
612                                             void *private_data)
613 {
614         struct pthreadpool_tevent_glue *glue =
615                 talloc_get_type_abort(private_data,
616                 struct pthreadpool_tevent_glue);
617         struct pthreadpool_tevent_job *job = NULL;
618         struct pthreadpool_tevent_job *njob = NULL;
619         int ret = -1;
620
621         ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
622         if (ret != 0) {
623                 TALLOC_FREE(glue->fde);
624         }
625
626         ret = pthreadpool_restart_check(glue->pool->pool);
627         if (ret == 0) {
628                 /*
629                  * success...
630                  */
631                 goto done;
632         }
633
634         /*
635          * There's a problem and the pool
636          * has not a single thread available
637          * for pending jobs, so we can only
638          * stop the jobs and return an error.
639          * This is similar to a failure from
640          * pthreadpool_add_job().
641          */
642         for (job = glue->pool->jobs; job != NULL; job = njob) {
643                 njob = job->next;
644
645                 tevent_req_defer_callback(job->state->req,
646                                           job->state->ev);
647                 tevent_req_error(job->state->req, ret);
648         }
649
650 done:
651         if (glue->states == NULL) {
652                 /*
653                  * If the glue doesn't have any pending jobs
654                  * we remove the glue.
655                  *
656                  * In order to remove the fd event.
657                  */
658                 TALLOC_FREE(glue);
659         }
660 }
661
662 static int pthreadpool_tevent_register_ev(
663                                 struct pthreadpool_tevent *pool,
664                                 struct pthreadpool_tevent_job_state *state)
665 {
666         struct tevent_context *ev = state->ev;
667         struct pthreadpool_tevent_glue *glue = NULL;
668         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
669         int monitor_fd = -1;
670
671         /*
672          * See if this tevent_context was already registered by
673          * searching the glue object list. If so we have nothing
674          * to do here - we already have a tevent_context/tevent_threaded_context
675          * pair.
676          */
677         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
678                 if (glue->ev == state->ev) {
679                         state->glue = glue;
680                         DLIST_ADD_END(glue->states, state);
681                         return 0;
682                 }
683         }
684
685         /*
686          * Event context not yet registered - create a new glue
687          * object containing a tevent_context/tevent_threaded_context
688          * pair and put it on the list to remember this registration.
689          * We also need a link object to ensure the event context
690          * can't go away without us knowing about it.
691          */
692         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
693         if (glue == NULL) {
694                 return ENOMEM;
695         }
696         *glue = (struct pthreadpool_tevent_glue) {
697                 .pool = pool,
698                 .ev = ev,
699         };
700         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
701
702         monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
703         if (monitor_fd == -1 && errno != ENOSYS) {
704                 int saved_errno = errno;
705                 TALLOC_FREE(glue);
706                 return saved_errno;
707         }
708
709         if (monitor_fd != -1) {
710                 glue->fde = tevent_add_fd(ev,
711                                           glue,
712                                           monitor_fd,
713                                           TEVENT_FD_READ,
714                                           pthreadpool_tevent_glue_monitor,
715                                           glue);
716                 if (glue->fde == NULL) {
717                         close(monitor_fd);
718                         TALLOC_FREE(glue);
719                         return ENOMEM;
720                 }
721                 tevent_fd_set_auto_close(glue->fde);
722         }
723
724         /*
725          * Now allocate the link object to the event context. Note this
726          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
727          * context is freed we are able to cleanup the glue object
728          * in the link object destructor.
729          */
730
731         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
732         if (ev_link == NULL) {
733                 TALLOC_FREE(glue);
734                 return ENOMEM;
735         }
736         ev_link->glue = glue;
737         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
738
739         glue->ev_link = ev_link;
740
741 #ifdef HAVE_PTHREAD
742         glue->tctx = tevent_threaded_context_create(glue, ev);
743         if (glue->tctx == NULL) {
744                 TALLOC_FREE(ev_link);
745                 TALLOC_FREE(glue);
746                 return ENOMEM;
747         }
748 #endif
749
750         state->glue = glue;
751         DLIST_ADD_END(glue->states, state);
752
753         DLIST_ADD(pool->glue_list, glue);
754         return 0;
755 }
756
757 static void pthreadpool_tevent_job_fn(void *private_data);
758 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
759                                         struct tevent_immediate *im,
760                                         void *private_data);
761 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
762
763 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
764 {
765         /*
766          * We should never be called with needs_fence.orphaned == false.
767          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
768          * after detaching from the request state, glue and pool list.
769          */
770         if (!job->needs_fence.orphaned) {
771                 abort();
772         }
773
774         /*
775          * If the job is not finished (job->im still there)
776          * and it's still attached to the pool,
777          * we try to cancel it (before it was starts)
778          */
779         if (job->im != NULL && job->pool != NULL) {
780                 size_t num;
781
782                 num = pthreadpool_cancel_job(job->pool->pool, 0,
783                                              pthreadpool_tevent_job_fn,
784                                              job);
785                 if (num != 0) {
786                         /*
787                          * It was not too late to cancel the request.
788                          *
789                          * We can remove job->im, as it will never be used.
790                          */
791                         TALLOC_FREE(job->im);
792                 }
793         }
794
795         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
796         if (job->needs_fence.dropped) {
797                 /*
798                  * The signal function saw job->needs_fence.orphaned
799                  * before it started the signaling via the immediate
800                  * event. So we'll never geht triggered and can
801                  * remove job->im and let the whole job go...
802                  */
803                 TALLOC_FREE(job->im);
804         }
805
806         /*
807          * TODO?: We could further improve this by adjusting
808          * tevent_threaded_schedule_immediate_destructor()
809          * and allow TALLOC_FREE() during its time
810          * in the main_ev->scheduled_immediates list.
811          *
812          * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
813          * if (state->needs_fence.signaled) {
814          *       *
815          *       * The signal function is completed
816          *       * in future we may be allowed
817          *       * to call TALLOC_FREE(job->im).
818          *       *
819          *      TALLOC_FREE(job->im);
820          * }
821          */
822
823         /*
824          * pthreadpool_tevent_job_orphan() already removed
825          * it from pool->jobs. And we don't need try
826          * pthreadpool_cancel_job() again.
827          */
828         job->pool = NULL;
829
830         if (job->im != NULL) {
831                 /*
832                  * state->im still there means, we need to wait for the
833                  * immediate event to be triggered or just leak the memory.
834                  *
835                  * Move it to the orphaned list, if it's not already there.
836                  */
837                 return -1;
838         }
839
840         /*
841          * Finally remove from the orphaned_jobs list
842          * and let talloc destroy us.
843          */
844         DLIST_REMOVE(orphaned_jobs, job);
845
846         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
847         return 0;
848 }
849
850 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
851 {
852         job->needs_fence.orphaned = true;
853         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
854
855         /*
856          * We're the only function that sets
857          * job->state = NULL;
858          */
859         if (job->state == NULL) {
860                 abort();
861         }
862
863         /*
864          * Once we marked the request as 'orphaned'
865          * we spin/loop if 'wrapper' is marked as active.
866          *
867          * We need to wait until the wrapper hook finished
868          * before we can set job->wrapper = NULL.
869          *
870          * This is some kind of spinlock, but with
871          * 1 millisecond sleeps in between, in order
872          * to give the thread more cpu time to finish.
873          */
874         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
875         while (job->needs_fence.wrapper) {
876                 poll(NULL, 0, 1);
877                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
878         }
879         job->wrapper = NULL;
880
881         /*
882          * Once we marked the request as 'orphaned'
883          * we spin/loop if it's already marked
884          * as 'finished' (which means that
885          * pthreadpool_tevent_job_signal() was entered.
886          * If it saw 'orphaned' it will exit after setting
887          * 'dropped', otherwise it dereferences
888          * job->state->glue->{tctx,ev} until it exited
889          * after setting 'signaled'.
890          *
891          * We need to close this potential gab before
892          * we can set job->state = NULL.
893          *
894          * This is some kind of spinlock, but with
895          * 1 millisecond sleeps in between, in order
896          * to give the thread more cpu time to finish.
897          */
898         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
899         while (job->needs_fence.finished) {
900                 if (job->needs_fence.dropped) {
901                         break;
902                 }
903                 if (job->needs_fence.signaled) {
904                         break;
905                 }
906                 poll(NULL, 0, 1);
907                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
908         }
909
910         /*
911          * Once the gab is closed, we can remove
912          * the glue link.
913          */
914         DLIST_REMOVE(job->state->glue->states, job->state);
915         job->state->glue = NULL;
916
917         /*
918          * We need to reparent to a long term context.
919          * And detach from the request state.
920          * Maybe the destructor will keep the memory
921          * and leak it for now.
922          */
923         (void)talloc_reparent(job->state, NULL, job);
924         job->state->job = NULL;
925         job->state = NULL;
926
927         /*
928          * job->pool will only be set to NULL
929          * in the first destructur run.
930          */
931         if (job->pool == NULL) {
932                 abort();
933         }
934
935         /*
936          * Dettach it from the pool.
937          *
938          * The job might still be running,
939          * so we keep job->pool.
940          * The destructor will set it to NULL
941          * after trying pthreadpool_cancel_job()
942          */
943         DLIST_REMOVE(job->pool->jobs, job);
944
945         /*
946          * Add it to the list of orphaned jobs,
947          * which may be cleaned up later.
948          *
949          * The destructor removes it from the list
950          * when possible or it denies the free
951          * and keep it in the list.
952          */
953         DLIST_ADD_END(orphaned_jobs, job);
954         TALLOC_FREE(job);
955 }
956
957 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
958                                            enum tevent_req_state req_state)
959 {
960         struct pthreadpool_tevent_job_state *state =
961                 tevent_req_data(req,
962                 struct pthreadpool_tevent_job_state);
963
964         if (state->job == NULL) {
965                 /*
966                  * The job request is not scheduled in the pool
967                  * yet or anymore.
968                  */
969                 if (state->glue != NULL) {
970                         DLIST_REMOVE(state->glue->states, state);
971                         state->glue = NULL;
972                 }
973                 return;
974         }
975
976         /*
977          * We need to reparent to a long term context.
978          * Maybe the destructor will keep the memory
979          * and leak it for now.
980          */
981         pthreadpool_tevent_job_orphan(state->job);
982         state->job = NULL; /* not needed but looks better */
983         return;
984 }
985
986 struct tevent_req *pthreadpool_tevent_job_send(
987         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
988         struct pthreadpool_tevent *pool,
989         void (*fn)(void *private_data), void *private_data)
990 {
991         struct tevent_req *req = NULL;
992         struct pthreadpool_tevent_job_state *state = NULL;
993         struct pthreadpool_tevent_job *job = NULL;
994         int ret;
995         struct pthreadpool_tevent *caller_pool = pool;
996         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
997
998         pthreadpool_tevent_cleanup_orphaned_jobs();
999
1000         if (wrapper != NULL) {
1001                 pool = wrapper->main_tp;
1002         }
1003
1004         req = tevent_req_create(mem_ctx, &state,
1005                                 struct pthreadpool_tevent_job_state);
1006         if (req == NULL) {
1007                 return NULL;
1008         }
1009         state->ev = ev;
1010         state->req = req;
1011
1012         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
1013
1014         if (pool == NULL) {
1015                 tevent_req_error(req, EINVAL);
1016                 return tevent_req_post(req, ev);
1017         }
1018         if (pool->pool == NULL) {
1019                 tevent_req_error(req, EINVAL);
1020                 return tevent_req_post(req, ev);
1021         }
1022
1023         ret = pthreadpool_tevent_register_ev(pool, state);
1024         if (tevent_req_error(req, ret)) {
1025                 return tevent_req_post(req, ev);
1026         }
1027
1028         job = talloc_zero(state, struct pthreadpool_tevent_job);
1029         if (tevent_req_nomem(job, req)) {
1030                 return tevent_req_post(req, ev);
1031         }
1032         job->pool = pool;
1033         job->wrapper = wrapper;
1034         job->fn = fn;
1035         job->private_data = private_data;
1036         job->im = tevent_create_immediate(state->job);
1037         if (tevent_req_nomem(job->im, req)) {
1038                 return tevent_req_post(req, ev);
1039         }
1040         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
1041         job->per_thread_cwd = pthreadpool_tevent_per_thread_cwd(caller_pool);
1042         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
1043         DLIST_ADD_END(job->pool->jobs, job);
1044         job->state = state;
1045         state->job = job;
1046
1047         ret = pthreadpool_add_job(job->pool->pool, 0,
1048                                   pthreadpool_tevent_job_fn,
1049                                   job);
1050         if (tevent_req_error(req, ret)) {
1051                 return tevent_req_post(req, ev);
1052         }
1053
1054         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
1055         return req;
1056 }
1057
1058 static __thread struct pthreadpool_tevent_job *current_job;
1059
1060 bool pthreadpool_tevent_current_job_canceled(void)
1061 {
1062         if (current_job == NULL) {
1063                 /*
1064                  * Should only be called from within
1065                  * the job function.
1066                  */
1067                 abort();
1068                 return false;
1069         }
1070
1071         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1072         return current_job->needs_fence.maycancel;
1073 }
1074
1075 bool pthreadpool_tevent_current_job_orphaned(void)
1076 {
1077         if (current_job == NULL) {
1078                 /*
1079                  * Should only be called from within
1080                  * the job function.
1081                  */
1082                 abort();
1083                 return false;
1084         }
1085
1086         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1087         return current_job->needs_fence.orphaned;
1088 }
1089
1090 bool pthreadpool_tevent_current_job_continue(void)
1091 {
1092         if (current_job == NULL) {
1093                 /*
1094                  * Should only be called from within
1095                  * the job function.
1096                  */
1097                 abort();
1098                 return false;
1099         }
1100
1101         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1102         if (current_job->needs_fence.maycancel) {
1103                 return false;
1104         }
1105         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1106         if (current_job->needs_fence.orphaned) {
1107                 return false;
1108         }
1109
1110         return true;
1111 }
1112
1113 bool pthreadpool_tevent_current_job_per_thread_cwd(void)
1114 {
1115         if (current_job == NULL) {
1116                 /*
1117                  * Should only be called from within
1118                  * the job function.
1119                  */
1120                 abort();
1121                 return false;
1122         }
1123
1124         return current_job->per_thread_cwd;
1125 }
1126
1127 static void pthreadpool_tevent_job_fn(void *private_data)
1128 {
1129         struct pthreadpool_tevent_job *job =
1130                 talloc_get_type_abort(private_data,
1131                 struct pthreadpool_tevent_job);
1132         struct pthreadpool_tevent_wrapper *wrapper = NULL;
1133
1134         current_job = job;
1135         job->needs_fence.started = true;
1136         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1137         if (job->needs_fence.orphaned) {
1138                 current_job = NULL;
1139                 return;
1140         }
1141
1142         wrapper = job->wrapper;
1143         if (wrapper != NULL) {
1144                 bool ok;
1145
1146                 job->needs_fence.wrapper = true;
1147                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1148                 if (job->needs_fence.orphaned) {
1149                         job->needs_fence.wrapper = false;
1150                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1151                         current_job = NULL;
1152                         return;
1153                 }
1154                 ok = wrapper->ops->before_job(wrapper->wrap_tp,
1155                                               wrapper->private_state,
1156                                               wrapper->main_tp,
1157                                               __location__);
1158                 job->needs_fence.wrapper = false;
1159                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1160                 if (!ok) {
1161                         job->needs_fence.exit_thread = true;
1162                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1163                         current_job = NULL;
1164                         return;
1165                 }
1166         }
1167
1168         job->fn(job->private_data);
1169
1170         job->needs_fence.executed = true;
1171         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1172
1173         if (wrapper != NULL) {
1174                 bool ok;
1175
1176                 job->needs_fence.wrapper = true;
1177                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1178                 if (job->needs_fence.orphaned) {
1179                         job->needs_fence.wrapper = false;
1180                         job->needs_fence.exit_thread = true;
1181                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1182                         current_job = NULL;
1183                         return;
1184                 }
1185                 ok = wrapper->ops->after_job(wrapper->wrap_tp,
1186                                              wrapper->private_state,
1187                                              wrapper->main_tp,
1188                                              __location__);
1189                 job->needs_fence.wrapper = false;
1190                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1191                 if (!ok) {
1192                         job->needs_fence.exit_thread = true;
1193                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1194                         current_job = NULL;
1195                         return;
1196                 }
1197         }
1198
1199         current_job = NULL;
1200 }
1201
1202 static int pthreadpool_tevent_job_signal(int jobid,
1203                                          void (*job_fn)(void *private_data),
1204                                          void *job_private_data,
1205                                          void *private_data)
1206 {
1207         struct pthreadpool_tevent_job *job =
1208                 talloc_get_type_abort(job_private_data,
1209                 struct pthreadpool_tevent_job);
1210
1211         job->needs_fence.finished = true;
1212         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1213         if (job->needs_fence.orphaned) {
1214                 /* Request already gone */
1215                 job->needs_fence.dropped = true;
1216                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1217                 if (job->needs_fence.exit_thread) {
1218                         /*
1219                          * A problem with the wrapper the current job/worker
1220                          * thread needs to terminate.
1221                          *
1222                          * The pthreadpool_tevent is already gone.
1223                          */
1224                         return -1;
1225                 }
1226                 return 0;
1227         }
1228
1229         /*
1230          * state and state->glue are valid,
1231          * see the job->needs_fence.finished
1232          * "spinlock" loop in
1233          * pthreadpool_tevent_job_orphan()
1234          */
1235         if (job->state->glue->tctx != NULL) {
1236                 /* with HAVE_PTHREAD */
1237                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
1238                                                    job->im,
1239                                                    pthreadpool_tevent_job_done,
1240                                                    job);
1241         } else {
1242                 /* without HAVE_PTHREAD */
1243                 tevent_schedule_immediate(job->im,
1244                                           job->state->glue->ev,
1245                                           pthreadpool_tevent_job_done,
1246                                           job);
1247         }
1248
1249         job->needs_fence.signaled = true;
1250         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1251         if (job->needs_fence.exit_thread) {
1252                 /*
1253                  * A problem with the wrapper the current job/worker
1254                  * thread needs to terminate.
1255                  *
1256                  * The pthreadpool_tevent is already gone.
1257                  */
1258                 return -1;
1259         }
1260         return 0;
1261 }
1262
1263 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
1264                                         struct tevent_immediate *im,
1265                                         void *private_data)
1266 {
1267         struct pthreadpool_tevent_job *job =
1268                 talloc_get_type_abort(private_data,
1269                 struct pthreadpool_tevent_job);
1270         struct pthreadpool_tevent_job_state *state = job->state;
1271
1272         TALLOC_FREE(job->im);
1273
1274         if (state == NULL) {
1275                 /* Request already gone */
1276                 TALLOC_FREE(job);
1277                 return;
1278         }
1279
1280         /*
1281          * pthreadpool_tevent_job_cleanup()
1282          * (called by tevent_req_done() or
1283          * tevent_req_error()) will destroy the job.
1284          */
1285
1286         if (job->needs_fence.executed) {
1287                 tevent_req_done(state->req);
1288                 return;
1289         }
1290
1291         tevent_req_error(state->req, ENOEXEC);
1292         return;
1293 }
1294
1295 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
1296 {
1297         struct pthreadpool_tevent_job_state *state =
1298                 tevent_req_data(req,
1299                 struct pthreadpool_tevent_job_state);
1300         struct pthreadpool_tevent_job *job = state->job;
1301         size_t num;
1302
1303         if (job == NULL) {
1304                 return false;
1305         }
1306
1307         job->needs_fence.maycancel = true;
1308         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1309         if (job->needs_fence.started) {
1310                 /*
1311                  * It was too late to cancel the request.
1312                  *
1313                  * The job still has the chance to look
1314                  * at pthreadpool_tevent_current_job_canceled()
1315                  * or pthreadpool_tevent_current_job_continue()
1316                  */
1317                 return false;
1318         }
1319
1320         num = pthreadpool_cancel_job(job->pool->pool, 0,
1321                                      pthreadpool_tevent_job_fn,
1322                                      job);
1323         if (num == 0) {
1324                 /*
1325                  * It was too late to cancel the request.
1326                  */
1327                 return false;
1328         }
1329
1330         /*
1331          * It was not too late to cancel the request.
1332          *
1333          * We can remove job->im, as it will never be used.
1334          */
1335         TALLOC_FREE(job->im);
1336
1337         /*
1338          * pthreadpool_tevent_job_cleanup()
1339          * will destroy the job.
1340          */
1341         tevent_req_defer_callback(req, state->ev);
1342         tevent_req_error(req, ECANCELED);
1343         return true;
1344 }
1345
1346 int pthreadpool_tevent_job_recv(struct tevent_req *req)
1347 {
1348         return tevent_req_simple_recv_unix(req);
1349 }