lib:replace: Add getprogname()
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/select.h"
22 #include "system/threads.h"
23 #include "system/filesys.h"
24 #include "pthreadpool_tevent.h"
25 #include "pthreadpool.h"
26 #include "lib/util/tevent_unix.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/util/attr.h"
29
30 /*
31  * We try to give some hints to helgrind/drd
32  *
33  * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
34  * takes an memory address range that ignored by helgrind/drd
35  * 'description' is just ignored...
36  *
37  *
38  * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
39  * just takes a DWORD/(void *) as unique key
40  * for the barrier.
41  */
42 #ifdef HAVE_VALGRIND_HELGRIND_H
43 #include <valgrind/helgrind.h>
44 #endif
45 #ifndef ANNOTATE_BENIGN_RACE_SIZED
46 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
47 #endif
48 #ifndef ANNOTATE_HAPPENS_BEFORE
49 #define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
50 #endif
51 #ifndef ANNOTATE_HAPPENS_AFTER
52 #define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
53 #endif
54 #ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
55 #define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
56 #endif
57
58 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
59         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
60         ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
61                                    sizeof(__j->needs_fence), \
62                                    "race by design, protected by fence"); \
63 } while(0);
64
65 #ifdef WITH_PTHREADPOOL
66 /*
67  * configure checked we have pthread and atomic_thread_fence() available
68  */
69 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
70         atomic_thread_fence(__order); \
71 } while(0)
72 #else
73 /*
74  * we're using lib/pthreadpool/pthreadpool_sync.c ...
75  */
76 #define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
77 #ifndef HAVE___THREAD
78 #define __thread
79 #endif
80 #endif
81
82 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
83         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
84         ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
85         __PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
86         ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
87 } while(0);
88
89 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
90         _UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
91         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
92 } while(0);
93
94 struct pthreadpool_tevent_job_state;
95
96 /*
97  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
98  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
99  * contexts in a pthreadpool_tevent.
100  */
101 struct pthreadpool_tevent_glue {
102         struct pthreadpool_tevent_glue *prev, *next;
103         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
104         /* Tuple we are keeping track of in this list. */
105         struct tevent_context *ev;
106         struct tevent_threaded_context *tctx;
107         /* recheck monitor fd event */
108         struct tevent_fd *fde;
109         /* Pointer to link object owned by *ev. */
110         struct pthreadpool_tevent_glue_ev_link *ev_link;
111         /* active jobs */
112         struct pthreadpool_tevent_job_state *states;
113 };
114
115 /*
116  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
117  * tevent context from our list of active event contexts if the event context
118  * is destroyed.
119  * This structure is talloc()'ed from the struct tevent_context *, and is a
120  * back-pointer allowing the related struct pthreadpool_tevent_glue object
121  * to be removed from the struct pthreadpool_tevent glue list if the owning
122  * tevent_context is talloc_free()'ed.
123  */
124 struct pthreadpool_tevent_glue_ev_link {
125         struct pthreadpool_tevent_glue *glue;
126 };
127
128 struct pthreadpool_tevent_wrapper {
129         struct pthreadpool_tevent *main_tp;
130         struct pthreadpool_tevent *wrap_tp;
131         const struct pthreadpool_tevent_wrapper_ops *ops;
132         void *private_state;
133         bool force_per_thread_cwd;
134 };
135
136 struct pthreadpool_tevent {
137         struct pthreadpool_tevent *prev, *next;
138
139         struct pthreadpool *pool;
140         struct pthreadpool_tevent_glue *glue_list;
141
142         struct pthreadpool_tevent_job *jobs;
143
144         struct {
145                 /*
146                  * This is used on the main context
147                  */
148                 struct pthreadpool_tevent *list;
149
150                 /*
151                  * This is used on the wrapper context
152                  */
153                 struct pthreadpool_tevent_wrapper *ctx;
154         } wrapper;
155 };
156
157 struct pthreadpool_tevent_job_state {
158         struct pthreadpool_tevent_job_state *prev, *next;
159         struct pthreadpool_tevent_glue *glue;
160         struct tevent_context *ev;
161         struct tevent_req *req;
162         struct pthreadpool_tevent_job *job;
163 };
164
165 struct pthreadpool_tevent_job {
166         struct pthreadpool_tevent_job *prev, *next;
167
168         struct pthreadpool_tevent *pool;
169         struct pthreadpool_tevent_wrapper *wrapper;
170         struct pthreadpool_tevent_job_state *state;
171         struct tevent_immediate *im;
172
173         void (*fn)(void *private_data);
174         void *private_data;
175
176         /*
177          * Coordination between threads
178          *
179          * There're only one side writing each element
180          * either the main process or the job thread.
181          *
182          * The coordination is done by a full memory
183          * barrier using atomic_thread_fence(memory_order_seq_cst)
184          * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
185          */
186         struct {
187                 /*
188                  * 'maycancel'
189                  * set when tevent_req_cancel() is called.
190                  * (only written by main thread!)
191                  */
192                 bool maycancel;
193
194                 /*
195                  * 'orphaned'
196                  * set when talloc_free is called on the job request,
197                  * tevent_context or pthreadpool_tevent.
198                  * (only written by main thread!)
199                  */
200                 bool orphaned;
201
202                 /*
203                  * 'started'
204                  * set when the job is picked up by a worker thread
205                  * (only written by job thread!)
206                  */
207                 bool started;
208
209                 /*
210                  * 'wrapper'
211                  * set before calling the wrapper before_job() or
212                  * after_job() hooks.
213                  * unset again check the hook finished.
214                  * (only written by job thread!)
215                  */
216                 bool wrapper;
217
218                 /*
219                  * 'executed'
220                  * set once the job function returned.
221                  * (only written by job thread!)
222                  */
223                 bool executed;
224
225                 /*
226                  * 'finished'
227                  * set when pthreadpool_tevent_job_signal() is entered
228                  * (only written by job thread!)
229                  */
230                 bool finished;
231
232                 /*
233                  * 'dropped'
234                  * set when pthreadpool_tevent_job_signal() leaves with
235                  * orphaned already set.
236                  * (only written by job thread!)
237                  */
238                 bool dropped;
239
240                 /*
241                  * 'signaled'
242                  * set when pthreadpool_tevent_job_signal() leaves normal
243                  * and the immediate event was scheduled.
244                  * (only written by job thread!)
245                  */
246                 bool signaled;
247
248                 /*
249                  * 'exit_thread'
250                  * maybe set during pthreadpool_tevent_job_fn()
251                  * if some wrapper related code generated an error
252                  * and the environment isn't safe anymore.
253                  *
254                  * In such a case pthreadpool_tevent_job_signal()
255                  * will pick this up and therminate the current
256                  * worker thread by returning -1.
257                  */
258                 bool exit_thread; /* only written/read by job thread! */
259         } needs_fence;
260
261         bool per_thread_cwd;
262 };
263
264 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
265
266 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
267
268 static struct pthreadpool_tevent_job *orphaned_jobs;
269
270 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
271 {
272         struct pthreadpool_tevent_job *job = NULL;
273         struct pthreadpool_tevent_job *njob = NULL;
274
275         for (job = orphaned_jobs; job != NULL; job = njob) {
276                 njob = job->next;
277
278                 /*
279                  * The job destructor keeps the job alive
280                  * (and in the list) or removes it from the list.
281                  */
282                 TALLOC_FREE(job);
283         }
284 }
285
286 static int pthreadpool_tevent_job_signal(int jobid,
287                                          void (*job_fn)(void *private_data),
288                                          void *job_private_data,
289                                          void *private_data);
290
291 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
292                             struct pthreadpool_tevent **presult)
293 {
294         struct pthreadpool_tevent *pool;
295         int ret;
296
297         pthreadpool_tevent_cleanup_orphaned_jobs();
298
299         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
300         if (pool == NULL) {
301                 return ENOMEM;
302         }
303
304         ret = pthreadpool_init(max_threads, &pool->pool,
305                                pthreadpool_tevent_job_signal, pool);
306         if (ret != 0) {
307                 TALLOC_FREE(pool);
308                 return ret;
309         }
310
311         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
312
313         *presult = pool;
314         return 0;
315 }
316
317 static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
318         struct pthreadpool_tevent *pool)
319 {
320         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
321
322         if (wrapper != NULL) {
323                 return wrapper->main_tp;
324         }
325
326         return pool;
327 }
328
329 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
330 {
331         pool = pthreadpool_tevent_unwrap(pool);
332
333         if (pool->pool == NULL) {
334                 return 0;
335         }
336
337         return pthreadpool_max_threads(pool->pool);
338 }
339
340 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
341 {
342         pool = pthreadpool_tevent_unwrap(pool);
343
344         if (pool->pool == NULL) {
345                 return 0;
346         }
347
348         return pthreadpool_queued_jobs(pool->pool);
349 }
350
351 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
352 {
353         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
354
355         if (wrapper != NULL && wrapper->force_per_thread_cwd) {
356                 return true;
357         }
358
359         pool = pthreadpool_tevent_unwrap(pool);
360
361         if (pool->pool == NULL) {
362                 return false;
363         }
364
365         return pthreadpool_per_thread_cwd(pool->pool);
366 }
367
368 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
369 {
370         struct pthreadpool_tevent_job *job = NULL;
371         struct pthreadpool_tevent_job *njob = NULL;
372         struct pthreadpool_tevent *wrap_tp = NULL;
373         struct pthreadpool_tevent *nwrap_tp = NULL;
374         struct pthreadpool_tevent_glue *glue = NULL;
375         int ret;
376
377         if (pool->wrapper.ctx != NULL) {
378                 struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
379
380                 pool->wrapper.ctx = NULL;
381                 pool = wrapper->main_tp;
382
383                 DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
384
385                 for (job = pool->jobs; job != NULL; job = njob) {
386                         njob = job->next;
387
388                         if (job->wrapper != wrapper) {
389                                 continue;
390                         }
391
392                         /*
393                          * This removes the job from the list
394                          *
395                          * Note that it waits in case
396                          * the wrapper hooks are currently
397                          * executing on the job.
398                          */
399                         pthreadpool_tevent_job_orphan(job);
400                 }
401
402                 /*
403                  * At this point we're sure that no job
404                  * still references the pthreadpool_tevent_wrapper
405                  * structure, so we can free it.
406                  */
407                 TALLOC_FREE(wrapper);
408
409                 pthreadpool_tevent_cleanup_orphaned_jobs();
410                 return 0;
411         }
412
413         if (pool->pool == NULL) {
414                 /*
415                  * A dangling wrapper without main_tp.
416                  */
417                 return 0;
418         }
419
420         ret = pthreadpool_stop(pool->pool);
421         if (ret != 0) {
422                 return ret;
423         }
424
425         /*
426          * orphan all jobs (including wrapper jobs)
427          */
428         for (job = pool->jobs; job != NULL; job = njob) {
429                 njob = job->next;
430
431                 /*
432                  * The job this removes it from the list
433                  *
434                  * Note that it waits in case
435                  * the wrapper hooks are currently
436                  * executing on the job (thread).
437                  */
438                 pthreadpool_tevent_job_orphan(job);
439         }
440
441         /*
442          * cleanup all existing wrappers, remember we just orphaned
443          * all jobs (including the once of the wrappers).
444          *
445          * So we just mark as broken, so that
446          * pthreadpool_tevent_job_send() won't accept new jobs.
447          */
448         for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
449                 nwrap_tp = wrap_tp->next;
450
451                 /*
452                  * Just mark them as broken, so that we can't
453                  * get more jobs.
454                  */
455                 TALLOC_FREE(wrap_tp->wrapper.ctx);
456
457                 DLIST_REMOVE(pool->wrapper.list, wrap_tp);
458         }
459
460         /*
461          * Delete all the registered
462          * tevent_context/tevent_threaded_context
463          * pairs.
464          */
465         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
466                 /* The glue destructor removes it from the list */
467                 TALLOC_FREE(glue);
468         }
469         pool->glue_list = NULL;
470
471         ret = pthreadpool_destroy(pool->pool);
472         if (ret != 0) {
473                 return ret;
474         }
475         pool->pool = NULL;
476
477         pthreadpool_tevent_cleanup_orphaned_jobs();
478
479         return 0;
480 }
481
482 struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
483                                 struct pthreadpool_tevent *main_tp,
484                                 TALLOC_CTX *mem_ctx,
485                                 const struct pthreadpool_tevent_wrapper_ops *ops,
486                                 void *pstate,
487                                 size_t psize,
488                                 const char *type,
489                                 const char *location)
490 {
491         void **ppstate = (void **)pstate;
492         struct pthreadpool_tevent *wrap_tp = NULL;
493         struct pthreadpool_tevent_wrapper *wrapper = NULL;
494
495         pthreadpool_tevent_cleanup_orphaned_jobs();
496
497         if (main_tp->wrapper.ctx != NULL) {
498                 /*
499                  * stacking of wrappers is not supported
500                  */
501                 errno = EINVAL;
502                 return NULL;
503         }
504
505         if (main_tp->pool == NULL) {
506                 /*
507                  * The pool is no longer valid,
508                  * most likely it was a wrapper context
509                  * where the main pool was destroyed.
510                  */
511                 errno = EINVAL;
512                 return NULL;
513         }
514
515         wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
516         if (wrap_tp == NULL) {
517                 return NULL;
518         }
519
520         wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
521         if (wrapper == NULL) {
522                 TALLOC_FREE(wrap_tp);
523                 return NULL;
524         }
525         wrapper->main_tp = main_tp;
526         wrapper->wrap_tp = wrap_tp;
527         wrapper->ops = ops;
528         wrapper->private_state = talloc_zero_size(wrapper, psize);
529         if (wrapper->private_state == NULL) {
530                 TALLOC_FREE(wrap_tp);
531                 return NULL;
532         }
533         talloc_set_name_const(wrapper->private_state, type);
534
535         wrap_tp->wrapper.ctx = wrapper;
536
537         DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
538
539         talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
540
541         *ppstate = wrapper->private_state;
542         return wrap_tp;
543 }
544
545 void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
546                                              const void *private_state)
547 {
548         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
549
550         if (wrapper == NULL) {
551                 abort();
552         }
553
554         if (wrapper->private_state != private_state) {
555                 abort();
556         }
557
558         wrapper->force_per_thread_cwd = true;
559 }
560
561 static int pthreadpool_tevent_glue_destructor(
562         struct pthreadpool_tevent_glue *glue)
563 {
564         struct pthreadpool_tevent_job_state *state = NULL;
565         struct pthreadpool_tevent_job_state *nstate = NULL;
566
567         TALLOC_FREE(glue->fde);
568
569         for (state = glue->states; state != NULL; state = nstate) {
570                 nstate = state->next;
571
572                 /* The job this removes it from the list */
573                 pthreadpool_tevent_job_orphan(state->job);
574         }
575
576         if (glue->pool->glue_list != NULL) {
577                 DLIST_REMOVE(glue->pool->glue_list, glue);
578         }
579
580         /* Ensure the ev_link destructor knows we're gone */
581         glue->ev_link->glue = NULL;
582
583         TALLOC_FREE(glue->ev_link);
584         TALLOC_FREE(glue->tctx);
585
586         return 0;
587 }
588
589 /*
590  * Destructor called either explicitly from
591  * pthreadpool_tevent_glue_destructor(), or indirectly
592  * when owning tevent_context is destroyed.
593  *
594  * When called from pthreadpool_tevent_glue_destructor()
595  * ev_link->glue is already NULL, so this does nothing.
596  *
597  * When called from talloc_free() of the owning
598  * tevent_context we must ensure we also remove the
599  * linked glue object from the list inside
600  * struct pthreadpool_tevent.
601  */
602 static int pthreadpool_tevent_glue_link_destructor(
603         struct pthreadpool_tevent_glue_ev_link *ev_link)
604 {
605         TALLOC_FREE(ev_link->glue);
606         return 0;
607 }
608
609 static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
610                                             struct tevent_fd *fde,
611                                             uint16_t flags,
612                                             void *private_data)
613 {
614         struct pthreadpool_tevent_glue *glue =
615                 talloc_get_type_abort(private_data,
616                 struct pthreadpool_tevent_glue);
617         struct pthreadpool_tevent_job *job = NULL;
618         struct pthreadpool_tevent_job *njob = NULL;
619         int ret = -1;
620
621         ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
622         if (ret != 0) {
623                 TALLOC_FREE(glue->fde);
624         }
625
626         ret = pthreadpool_restart_check(glue->pool->pool);
627         if (ret == 0) {
628                 /*
629                  * success...
630                  */
631                 goto done;
632         }
633
634         /*
635          * There's a problem and the pool
636          * has not a single thread available
637          * for pending jobs, so we can only
638          * stop the jobs and return an error.
639          * This is similar to a failure from
640          * pthreadpool_add_job().
641          */
642         for (job = glue->pool->jobs; job != NULL; job = njob) {
643                 njob = job->next;
644
645                 tevent_req_defer_callback(job->state->req,
646                                           job->state->ev);
647                 tevent_req_error(job->state->req, ret);
648         }
649
650 done:
651         if (glue->states == NULL) {
652                 /*
653                  * If the glue doesn't have any pending jobs
654                  * we remove the glue.
655                  *
656                  * In order to remove the fd event.
657                  */
658                 TALLOC_FREE(glue);
659         }
660 }
661
662 static int pthreadpool_tevent_register_ev(
663                                 struct pthreadpool_tevent *pool,
664                                 struct pthreadpool_tevent_job_state *state)
665 {
666         struct tevent_context *ev = state->ev;
667         struct pthreadpool_tevent_glue *glue = NULL;
668         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
669         int monitor_fd = -1;
670
671         /*
672          * See if this tevent_context was already registered by
673          * searching the glue object list. If so we have nothing
674          * to do here - we already have a tevent_context/tevent_threaded_context
675          * pair.
676          */
677         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
678                 if (glue->ev == state->ev) {
679                         state->glue = glue;
680                         DLIST_ADD_END(glue->states, state);
681                         return 0;
682                 }
683         }
684
685         /*
686          * Event context not yet registered - create a new glue
687          * object containing a tevent_context/tevent_threaded_context
688          * pair and put it on the list to remember this registration.
689          * We also need a link object to ensure the event context
690          * can't go away without us knowing about it.
691          */
692         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
693         if (glue == NULL) {
694                 return ENOMEM;
695         }
696         *glue = (struct pthreadpool_tevent_glue) {
697                 .pool = pool,
698                 .ev = ev,
699         };
700         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
701
702         monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
703         if (monitor_fd == -1 && errno != ENOSYS) {
704                 int saved_errno = errno;
705                 TALLOC_FREE(glue);
706                 return saved_errno;
707         }
708
709         if (monitor_fd != -1) {
710                 glue->fde = tevent_add_fd(ev,
711                                           glue,
712                                           monitor_fd,
713                                           TEVENT_FD_READ,
714                                           pthreadpool_tevent_glue_monitor,
715                                           glue);
716                 if (glue->fde == NULL) {
717                         close(monitor_fd);
718                         TALLOC_FREE(glue);
719                         return ENOMEM;
720                 }
721                 tevent_fd_set_auto_close(glue->fde);
722                 monitor_fd = -1;
723         }
724
725         /*
726          * Now allocate the link object to the event context. Note this
727          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
728          * context is freed we are able to cleanup the glue object
729          * in the link object destructor.
730          */
731
732         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
733         if (ev_link == NULL) {
734                 TALLOC_FREE(glue);
735                 return ENOMEM;
736         }
737         ev_link->glue = glue;
738         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
739
740         glue->ev_link = ev_link;
741
742 #ifdef HAVE_PTHREAD
743         glue->tctx = tevent_threaded_context_create(glue, ev);
744         if (glue->tctx == NULL) {
745                 TALLOC_FREE(ev_link);
746                 TALLOC_FREE(glue);
747                 return ENOMEM;
748         }
749 #endif
750
751         state->glue = glue;
752         DLIST_ADD_END(glue->states, state);
753
754         DLIST_ADD(pool->glue_list, glue);
755         return 0;
756 }
757
758 static void pthreadpool_tevent_job_fn(void *private_data);
759 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
760                                         struct tevent_immediate *im,
761                                         void *private_data);
762 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
763
764 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
765 {
766         /*
767          * We should never be called with needs_fence.orphaned == false.
768          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
769          * after detaching from the request state, glue and pool list.
770          */
771         if (!job->needs_fence.orphaned) {
772                 abort();
773         }
774
775         /*
776          * If the job is not finished (job->im still there)
777          * and it's still attached to the pool,
778          * we try to cancel it (before it was starts)
779          */
780         if (job->im != NULL && job->pool != NULL) {
781                 size_t num;
782
783                 num = pthreadpool_cancel_job(job->pool->pool, 0,
784                                              pthreadpool_tevent_job_fn,
785                                              job);
786                 if (num != 0) {
787                         /*
788                          * It was not too late to cancel the request.
789                          *
790                          * We can remove job->im, as it will never be used.
791                          */
792                         TALLOC_FREE(job->im);
793                 }
794         }
795
796         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
797         if (job->needs_fence.dropped) {
798                 /*
799                  * The signal function saw job->needs_fence.orphaned
800                  * before it started the signaling via the immediate
801                  * event. So we'll never geht triggered and can
802                  * remove job->im and let the whole job go...
803                  */
804                 TALLOC_FREE(job->im);
805         }
806
807         /*
808          * TODO?: We could further improve this by adjusting
809          * tevent_threaded_schedule_immediate_destructor()
810          * and allow TALLOC_FREE() during its time
811          * in the main_ev->scheduled_immediates list.
812          *
813          * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
814          * if (state->needs_fence.signaled) {
815          *       *
816          *       * The signal function is completed
817          *       * in future we may be allowed
818          *       * to call TALLOC_FREE(job->im).
819          *       *
820          *      TALLOC_FREE(job->im);
821          * }
822          */
823
824         /*
825          * pthreadpool_tevent_job_orphan() already removed
826          * it from pool->jobs. And we don't need try
827          * pthreadpool_cancel_job() again.
828          */
829         job->pool = NULL;
830
831         if (job->im != NULL) {
832                 /*
833                  * state->im still there means, we need to wait for the
834                  * immediate event to be triggered or just leak the memory.
835                  *
836                  * Move it to the orphaned list, if it's not already there.
837                  */
838                 return -1;
839         }
840
841         /*
842          * Finally remove from the orphaned_jobs list
843          * and let talloc destroy us.
844          */
845         DLIST_REMOVE(orphaned_jobs, job);
846
847         PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
848         return 0;
849 }
850
851 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
852 {
853         job->needs_fence.orphaned = true;
854         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
855
856         /*
857          * We're the only function that sets
858          * job->state = NULL;
859          */
860         if (job->state == NULL) {
861                 abort();
862         }
863
864         /*
865          * Once we marked the request as 'orphaned'
866          * we spin/loop if 'wrapper' is marked as active.
867          *
868          * We need to wait until the wrapper hook finished
869          * before we can set job->wrapper = NULL.
870          *
871          * This is some kind of spinlock, but with
872          * 1 millisecond sleeps in between, in order
873          * to give the thread more cpu time to finish.
874          */
875         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
876         while (job->needs_fence.wrapper) {
877                 (void)poll(NULL, 0, 1);
878                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
879         }
880         job->wrapper = NULL;
881
882         /*
883          * Once we marked the request as 'orphaned'
884          * we spin/loop if it's already marked
885          * as 'finished' (which means that
886          * pthreadpool_tevent_job_signal() was entered.
887          * If it saw 'orphaned' it will exit after setting
888          * 'dropped', otherwise it dereferences
889          * job->state->glue->{tctx,ev} until it exited
890          * after setting 'signaled'.
891          *
892          * We need to close this potential gab before
893          * we can set job->state = NULL.
894          *
895          * This is some kind of spinlock, but with
896          * 1 millisecond sleeps in between, in order
897          * to give the thread more cpu time to finish.
898          */
899         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
900         while (job->needs_fence.finished) {
901                 if (job->needs_fence.dropped) {
902                         break;
903                 }
904                 if (job->needs_fence.signaled) {
905                         break;
906                 }
907                 (void)poll(NULL, 0, 1);
908                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
909         }
910
911         /*
912          * Once the gab is closed, we can remove
913          * the glue link.
914          */
915         DLIST_REMOVE(job->state->glue->states, job->state);
916         job->state->glue = NULL;
917
918         /*
919          * We need to reparent to a long term context.
920          * And detach from the request state.
921          * Maybe the destructor will keep the memory
922          * and leak it for now.
923          */
924         (void)talloc_reparent(job->state, NULL, job);
925         job->state->job = NULL;
926         job->state = NULL;
927
928         /*
929          * job->pool will only be set to NULL
930          * in the first destructur run.
931          */
932         if (job->pool == NULL) {
933                 abort();
934         }
935
936         /*
937          * Dettach it from the pool.
938          *
939          * The job might still be running,
940          * so we keep job->pool.
941          * The destructor will set it to NULL
942          * after trying pthreadpool_cancel_job()
943          */
944         DLIST_REMOVE(job->pool->jobs, job);
945
946         /*
947          * Add it to the list of orphaned jobs,
948          * which may be cleaned up later.
949          *
950          * The destructor removes it from the list
951          * when possible or it denies the free
952          * and keep it in the list.
953          */
954         DLIST_ADD_END(orphaned_jobs, job);
955         TALLOC_FREE(job);
956 }
957
958 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
959                                            enum tevent_req_state req_state)
960 {
961         struct pthreadpool_tevent_job_state *state =
962                 tevent_req_data(req,
963                 struct pthreadpool_tevent_job_state);
964
965         if (state->job == NULL) {
966                 /*
967                  * The job request is not scheduled in the pool
968                  * yet or anymore.
969                  */
970                 if (state->glue != NULL) {
971                         DLIST_REMOVE(state->glue->states, state);
972                         state->glue = NULL;
973                 }
974                 return;
975         }
976
977         /*
978          * We need to reparent to a long term context.
979          * Maybe the destructor will keep the memory
980          * and leak it for now.
981          */
982         pthreadpool_tevent_job_orphan(state->job);
983         state->job = NULL; /* not needed but looks better */
984         return;
985 }
986
987 struct tevent_req *pthreadpool_tevent_job_send(
988         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
989         struct pthreadpool_tevent *pool,
990         void (*fn)(void *private_data), void *private_data)
991 {
992         struct tevent_req *req = NULL;
993         struct pthreadpool_tevent_job_state *state = NULL;
994         struct pthreadpool_tevent_job *job = NULL;
995         int ret;
996         struct pthreadpool_tevent *caller_pool = pool;
997         struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
998
999         pthreadpool_tevent_cleanup_orphaned_jobs();
1000
1001         if (wrapper != NULL) {
1002                 pool = wrapper->main_tp;
1003         }
1004
1005         req = tevent_req_create(mem_ctx, &state,
1006                                 struct pthreadpool_tevent_job_state);
1007         if (req == NULL) {
1008                 return NULL;
1009         }
1010         state->ev = ev;
1011         state->req = req;
1012
1013         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
1014
1015         if (pool == NULL) {
1016                 tevent_req_error(req, EINVAL);
1017                 return tevent_req_post(req, ev);
1018         }
1019         if (pool->pool == NULL) {
1020                 tevent_req_error(req, EINVAL);
1021                 return tevent_req_post(req, ev);
1022         }
1023
1024         ret = pthreadpool_tevent_register_ev(pool, state);
1025         if (tevent_req_error(req, ret)) {
1026                 return tevent_req_post(req, ev);
1027         }
1028
1029         job = talloc_zero(state, struct pthreadpool_tevent_job);
1030         if (tevent_req_nomem(job, req)) {
1031                 return tevent_req_post(req, ev);
1032         }
1033         job->pool = pool;
1034         job->wrapper = wrapper;
1035         job->fn = fn;
1036         job->private_data = private_data;
1037         job->im = tevent_create_immediate(state->job);
1038         if (tevent_req_nomem(job->im, req)) {
1039                 return tevent_req_post(req, ev);
1040         }
1041         PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
1042         job->per_thread_cwd = pthreadpool_tevent_per_thread_cwd(caller_pool);
1043         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
1044         DLIST_ADD_END(job->pool->jobs, job);
1045         job->state = state;
1046         state->job = job;
1047
1048         ret = pthreadpool_add_job(job->pool->pool, 0,
1049                                   pthreadpool_tevent_job_fn,
1050                                   job);
1051         if (tevent_req_error(req, ret)) {
1052                 return tevent_req_post(req, ev);
1053         }
1054
1055         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
1056         return req;
1057 }
1058
1059 static __thread struct pthreadpool_tevent_job *current_job;
1060
1061 bool pthreadpool_tevent_current_job_canceled(void)
1062 {
1063         if (current_job == NULL) {
1064                 /*
1065                  * Should only be called from within
1066                  * the job function.
1067                  */
1068                 abort();
1069                 return false;
1070         }
1071
1072         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1073         return current_job->needs_fence.maycancel;
1074 }
1075
1076 bool pthreadpool_tevent_current_job_orphaned(void)
1077 {
1078         if (current_job == NULL) {
1079                 /*
1080                  * Should only be called from within
1081                  * the job function.
1082                  */
1083                 abort();
1084                 return false;
1085         }
1086
1087         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1088         return current_job->needs_fence.orphaned;
1089 }
1090
1091 bool pthreadpool_tevent_current_job_continue(void)
1092 {
1093         if (current_job == NULL) {
1094                 /*
1095                  * Should only be called from within
1096                  * the job function.
1097                  */
1098                 abort();
1099                 return false;
1100         }
1101
1102         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1103         if (current_job->needs_fence.maycancel) {
1104                 return false;
1105         }
1106         PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
1107         if (current_job->needs_fence.orphaned) {
1108                 return false;
1109         }
1110
1111         return true;
1112 }
1113
1114 bool pthreadpool_tevent_current_job_per_thread_cwd(void)
1115 {
1116         if (current_job == NULL) {
1117                 /*
1118                  * Should only be called from within
1119                  * the job function.
1120                  */
1121                 abort();
1122                 return false;
1123         }
1124
1125         return current_job->per_thread_cwd;
1126 }
1127
1128 static void pthreadpool_tevent_job_fn(void *private_data)
1129 {
1130         struct pthreadpool_tevent_job *job =
1131                 talloc_get_type_abort(private_data,
1132                 struct pthreadpool_tevent_job);
1133         struct pthreadpool_tevent_wrapper *wrapper = NULL;
1134
1135         current_job = job;
1136         job->needs_fence.started = true;
1137         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1138         if (job->needs_fence.orphaned) {
1139                 current_job = NULL;
1140                 return;
1141         }
1142
1143         wrapper = job->wrapper;
1144         if (wrapper != NULL) {
1145                 bool ok;
1146
1147                 job->needs_fence.wrapper = true;
1148                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1149                 if (job->needs_fence.orphaned) {
1150                         job->needs_fence.wrapper = false;
1151                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1152                         current_job = NULL;
1153                         return;
1154                 }
1155                 ok = wrapper->ops->before_job(wrapper->wrap_tp,
1156                                               wrapper->private_state,
1157                                               wrapper->main_tp,
1158                                               __location__);
1159                 job->needs_fence.wrapper = false;
1160                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1161                 if (!ok) {
1162                         job->needs_fence.exit_thread = true;
1163                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1164                         current_job = NULL;
1165                         return;
1166                 }
1167         }
1168
1169         job->fn(job->private_data);
1170
1171         job->needs_fence.executed = true;
1172         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1173
1174         if (wrapper != NULL) {
1175                 bool ok;
1176
1177                 job->needs_fence.wrapper = true;
1178                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1179                 if (job->needs_fence.orphaned) {
1180                         job->needs_fence.wrapper = false;
1181                         job->needs_fence.exit_thread = true;
1182                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1183                         current_job = NULL;
1184                         return;
1185                 }
1186                 ok = wrapper->ops->after_job(wrapper->wrap_tp,
1187                                              wrapper->private_state,
1188                                              wrapper->main_tp,
1189                                              __location__);
1190                 job->needs_fence.wrapper = false;
1191                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1192                 if (!ok) {
1193                         job->needs_fence.exit_thread = true;
1194                         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1195                         current_job = NULL;
1196                         return;
1197                 }
1198         }
1199
1200         current_job = NULL;
1201 }
1202
1203 static int pthreadpool_tevent_job_signal(int jobid,
1204                                          void (*job_fn)(void *private_data),
1205                                          void *job_private_data,
1206                                          void *private_data)
1207 {
1208         struct pthreadpool_tevent_job *job =
1209                 talloc_get_type_abort(job_private_data,
1210                 struct pthreadpool_tevent_job);
1211
1212         job->needs_fence.finished = true;
1213         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1214         if (job->needs_fence.orphaned) {
1215                 /* Request already gone */
1216                 job->needs_fence.dropped = true;
1217                 PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1218                 if (job->needs_fence.exit_thread) {
1219                         /*
1220                          * A problem with the wrapper the current job/worker
1221                          * thread needs to terminate.
1222                          *
1223                          * The pthreadpool_tevent is already gone.
1224                          */
1225                         return -1;
1226                 }
1227                 return 0;
1228         }
1229
1230         /*
1231          * state and state->glue are valid,
1232          * see the job->needs_fence.finished
1233          * "spinlock" loop in
1234          * pthreadpool_tevent_job_orphan()
1235          */
1236         if (job->state->glue->tctx != NULL) {
1237                 /* with HAVE_PTHREAD */
1238                 tevent_threaded_schedule_immediate(job->state->glue->tctx,
1239                                                    job->im,
1240                                                    pthreadpool_tevent_job_done,
1241                                                    job);
1242         } else {
1243                 /* without HAVE_PTHREAD */
1244                 tevent_schedule_immediate(job->im,
1245                                           job->state->glue->ev,
1246                                           pthreadpool_tevent_job_done,
1247                                           job);
1248         }
1249
1250         job->needs_fence.signaled = true;
1251         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1252         if (job->needs_fence.exit_thread) {
1253                 /*
1254                  * A problem with the wrapper the current job/worker
1255                  * thread needs to terminate.
1256                  *
1257                  * The pthreadpool_tevent is already gone.
1258                  */
1259                 return -1;
1260         }
1261         return 0;
1262 }
1263
1264 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
1265                                         struct tevent_immediate *im,
1266                                         void *private_data)
1267 {
1268         struct pthreadpool_tevent_job *job =
1269                 talloc_get_type_abort(private_data,
1270                 struct pthreadpool_tevent_job);
1271         struct pthreadpool_tevent_job_state *state = job->state;
1272
1273         TALLOC_FREE(job->im);
1274
1275         if (state == NULL) {
1276                 /* Request already gone */
1277                 TALLOC_FREE(job);
1278                 return;
1279         }
1280
1281         /*
1282          * pthreadpool_tevent_job_cleanup()
1283          * (called by tevent_req_done() or
1284          * tevent_req_error()) will destroy the job.
1285          */
1286
1287         if (job->needs_fence.executed) {
1288                 tevent_req_done(state->req);
1289                 return;
1290         }
1291
1292         tevent_req_error(state->req, ENOEXEC);
1293         return;
1294 }
1295
1296 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
1297 {
1298         struct pthreadpool_tevent_job_state *state =
1299                 tevent_req_data(req,
1300                 struct pthreadpool_tevent_job_state);
1301         struct pthreadpool_tevent_job *job = state->job;
1302         size_t num;
1303
1304         if (job == NULL) {
1305                 return false;
1306         }
1307
1308         job->needs_fence.maycancel = true;
1309         PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
1310         if (job->needs_fence.started) {
1311                 /*
1312                  * It was too late to cancel the request.
1313                  *
1314                  * The job still has the chance to look
1315                  * at pthreadpool_tevent_current_job_canceled()
1316                  * or pthreadpool_tevent_current_job_continue()
1317                  */
1318                 return false;
1319         }
1320
1321         num = pthreadpool_cancel_job(job->pool->pool, 0,
1322                                      pthreadpool_tevent_job_fn,
1323                                      job);
1324         if (num == 0) {
1325                 /*
1326                  * It was too late to cancel the request.
1327                  */
1328                 return false;
1329         }
1330
1331         /*
1332          * It was not too late to cancel the request.
1333          *
1334          * We can remove job->im, as it will never be used.
1335          */
1336         TALLOC_FREE(job->im);
1337
1338         /*
1339          * pthreadpool_tevent_job_cleanup()
1340          * will destroy the job.
1341          */
1342         tevent_req_defer_callback(req, state->ev);
1343         tevent_req_error(req, ECANCELED);
1344         return true;
1345 }
1346
1347 int pthreadpool_tevent_job_recv(struct tevent_req *req)
1348 {
1349         return tevent_req_simple_recv_unix(req);
1350 }