6999730f25501f13c43094da2ed7774b92b174ad
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "pthreadpool_tevent.h"
23 #include "pthreadpool.h"
24 #include "lib/util/tevent_unix.h"
25 #include "lib/util/dlinklist.h"
26
27 struct pthreadpool_tevent_job_state;
28
29 /*
30  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
31  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
32  * contexts in a pthreadpool_tevent.
33  */
34 struct pthreadpool_tevent_glue {
35         struct pthreadpool_tevent_glue *prev, *next;
36         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
37         /* Tuple we are keeping track of in this list. */
38         struct tevent_context *ev;
39         struct tevent_threaded_context *tctx;
40         /* Pointer to link object owned by *ev. */
41         struct pthreadpool_tevent_glue_ev_link *ev_link;
42 };
43
44 /*
45  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
46  * tevent context from our list of active event contexts if the event context
47  * is destroyed.
48  * This structure is talloc()'ed from the struct tevent_context *, and is a
49  * back-pointer allowing the related struct pthreadpool_tevent_glue object
50  * to be removed from the struct pthreadpool_tevent glue list if the owning
51  * tevent_context is talloc_free()'ed.
52  */
53 struct pthreadpool_tevent_glue_ev_link {
54         struct pthreadpool_tevent_glue *glue;
55 };
56
57 struct pthreadpool_tevent {
58         struct pthreadpool *pool;
59         struct pthreadpool_tevent_glue *glue_list;
60
61         struct pthreadpool_tevent_job *jobs;
62 };
63
64 struct pthreadpool_tevent_job_state {
65         struct tevent_context *ev;
66         struct tevent_req *req;
67         struct pthreadpool_tevent_job *job;
68 };
69
70 struct pthreadpool_tevent_job {
71         struct pthreadpool_tevent_job *prev, *next;
72
73         struct pthreadpool_tevent *pool;
74         struct pthreadpool_tevent_job_state *state;
75         struct tevent_immediate *im;
76
77         void (*fn)(void *private_data);
78         void *private_data;
79 };
80
81 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
82
83 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
84
85 static int pthreadpool_tevent_job_signal(int jobid,
86                                          void (*job_fn)(void *private_data),
87                                          void *job_private_data,
88                                          void *private_data);
89
90 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
91                             struct pthreadpool_tevent **presult)
92 {
93         struct pthreadpool_tevent *pool;
94         int ret;
95
96         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
97         if (pool == NULL) {
98                 return ENOMEM;
99         }
100
101         ret = pthreadpool_init(max_threads, &pool->pool,
102                                pthreadpool_tevent_job_signal, pool);
103         if (ret != 0) {
104                 TALLOC_FREE(pool);
105                 return ret;
106         }
107
108         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
109
110         *presult = pool;
111         return 0;
112 }
113
114 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
115 {
116         if (pool->pool == NULL) {
117                 return 0;
118         }
119
120         return pthreadpool_max_threads(pool->pool);
121 }
122
123 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
124 {
125         if (pool->pool == NULL) {
126                 return 0;
127         }
128
129         return pthreadpool_queued_jobs(pool->pool);
130 }
131
132 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
133 {
134         struct pthreadpool_tevent_job *job = NULL;
135         struct pthreadpool_tevent_job *njob = NULL;
136         struct pthreadpool_tevent_glue *glue = NULL;
137         int ret;
138
139         ret = pthreadpool_stop(pool->pool);
140         if (ret != 0) {
141                 return ret;
142         }
143
144         for (job = pool->jobs; job != NULL; job = njob) {
145                 njob = job->next;
146
147                 /* The job this removes it from the list */
148                 pthreadpool_tevent_job_orphan(job);
149         }
150
151         /*
152          * Delete all the registered
153          * tevent_context/tevent_threaded_context
154          * pairs.
155          */
156         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
157                 /* The glue destructor removes it from the list */
158                 TALLOC_FREE(glue);
159         }
160         pool->glue_list = NULL;
161
162         ret = pthreadpool_destroy(pool->pool);
163         if (ret != 0) {
164                 return ret;
165         }
166         pool->pool = NULL;
167
168         return 0;
169 }
170
171 static int pthreadpool_tevent_glue_destructor(
172         struct pthreadpool_tevent_glue *glue)
173 {
174         if (glue->pool->glue_list != NULL) {
175                 DLIST_REMOVE(glue->pool->glue_list, glue);
176         }
177
178         /* Ensure the ev_link destructor knows we're gone */
179         glue->ev_link->glue = NULL;
180
181         TALLOC_FREE(glue->ev_link);
182         TALLOC_FREE(glue->tctx);
183
184         return 0;
185 }
186
187 /*
188  * Destructor called either explicitly from
189  * pthreadpool_tevent_glue_destructor(), or indirectly
190  * when owning tevent_context is destroyed.
191  *
192  * When called from pthreadpool_tevent_glue_destructor()
193  * ev_link->glue is already NULL, so this does nothing.
194  *
195  * When called from talloc_free() of the owning
196  * tevent_context we must ensure we also remove the
197  * linked glue object from the list inside
198  * struct pthreadpool_tevent.
199  */
200 static int pthreadpool_tevent_glue_link_destructor(
201         struct pthreadpool_tevent_glue_ev_link *ev_link)
202 {
203         TALLOC_FREE(ev_link->glue);
204         return 0;
205 }
206
207 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
208                                           struct tevent_context *ev)
209 {
210         struct pthreadpool_tevent_glue *glue = NULL;
211         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
212
213         /*
214          * See if this tevent_context was already registered by
215          * searching the glue object list. If so we have nothing
216          * to do here - we already have a tevent_context/tevent_threaded_context
217          * pair.
218          */
219         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
220                 if (glue->ev == ev) {
221                         return 0;
222                 }
223         }
224
225         /*
226          * Event context not yet registered - create a new glue
227          * object containing a tevent_context/tevent_threaded_context
228          * pair and put it on the list to remember this registration.
229          * We also need a link object to ensure the event context
230          * can't go away without us knowing about it.
231          */
232         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
233         if (glue == NULL) {
234                 return ENOMEM;
235         }
236         *glue = (struct pthreadpool_tevent_glue) {
237                 .pool = pool,
238                 .ev = ev,
239         };
240         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
241
242         /*
243          * Now allocate the link object to the event context. Note this
244          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
245          * context is freed we are able to cleanup the glue object
246          * in the link object destructor.
247          */
248
249         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
250         if (ev_link == NULL) {
251                 TALLOC_FREE(glue);
252                 return ENOMEM;
253         }
254         ev_link->glue = glue;
255         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
256
257         glue->ev_link = ev_link;
258
259 #ifdef HAVE_PTHREAD
260         glue->tctx = tevent_threaded_context_create(glue, ev);
261         if (glue->tctx == NULL) {
262                 TALLOC_FREE(ev_link);
263                 TALLOC_FREE(glue);
264                 return ENOMEM;
265         }
266 #endif
267
268         DLIST_ADD(pool->glue_list, glue);
269         return 0;
270 }
271
272 static void pthreadpool_tevent_job_fn(void *private_data);
273 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
274                                         struct tevent_immediate *im,
275                                         void *private_data);
276
277 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
278 {
279         /*
280          * We should never be called with state->state != NULL.
281          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
282          * after detaching from the request state and pool list.
283          */
284         if (job->state != NULL) {
285                 abort();
286         }
287
288         /*
289          * If the job is not finished (job->im still there)
290          * and it's still attached to the pool,
291          * we try to cancel it (before it was starts)
292          */
293         if (job->im != NULL && job->pool != NULL) {
294                 size_t num;
295
296                 num = pthreadpool_cancel_job(job->pool->pool, 0,
297                                              pthreadpool_tevent_job_fn,
298                                              job);
299                 if (num != 0) {
300                         /*
301                          * It was not too late to cancel the request.
302                          *
303                          * We can remove job->im, as it will never be used.
304                          */
305                         TALLOC_FREE(job->im);
306                 }
307         }
308
309         /*
310          * pthreadpool_tevent_job_orphan() already removed
311          * it from pool->jobs. And we don't need try
312          * pthreadpool_cancel_job() again.
313          */
314         job->pool = NULL;
315
316         if (job->im != NULL) {
317                 /*
318                  * state->im still there means, we need to wait for the
319                  * immediate event to be triggered or just leak the memory.
320                  */
321                 return -1;
322         }
323
324         return 0;
325 }
326
327 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
328 {
329         /*
330          * We're the only function that sets
331          * job->state = NULL;
332          */
333         if (job->state == NULL) {
334                 abort();
335         }
336
337         /*
338          * We need to reparent to a long term context.
339          * And detach from the request state.
340          * Maybe the destructor will keep the memory
341          * and leak it for now.
342          */
343         (void)talloc_reparent(job->state, NULL, job);
344         job->state->job = NULL;
345         job->state = NULL;
346
347         /*
348          * job->pool will only be set to NULL
349          * in the first destructur run.
350          */
351         if (job->pool == NULL) {
352                 abort();
353         }
354
355         /*
356          * Dettach it from the pool.
357          *
358          * The job might still be running,
359          * so we keep job->pool.
360          * The destructor will set it to NULL
361          * after trying pthreadpool_cancel_job()
362          */
363         DLIST_REMOVE(job->pool->jobs, job);
364
365         TALLOC_FREE(job);
366 }
367
368 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
369                                            enum tevent_req_state req_state)
370 {
371         struct pthreadpool_tevent_job_state *state =
372                 tevent_req_data(req,
373                 struct pthreadpool_tevent_job_state);
374
375         if (state->job == NULL) {
376                 /*
377                  * The job request is not scheduled in the pool
378                  * yet or anymore.
379                  */
380                 return;
381         }
382
383         /*
384          * We need to reparent to a long term context.
385          * Maybe the destructor will keep the memory
386          * and leak it for now.
387          */
388         pthreadpool_tevent_job_orphan(state->job);
389         state->job = NULL; /* not needed but looks better */
390         return;
391 }
392
393 struct tevent_req *pthreadpool_tevent_job_send(
394         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
395         struct pthreadpool_tevent *pool,
396         void (*fn)(void *private_data), void *private_data)
397 {
398         struct tevent_req *req = NULL;
399         struct pthreadpool_tevent_job_state *state = NULL;
400         struct pthreadpool_tevent_job *job = NULL;
401         int ret;
402
403         req = tevent_req_create(mem_ctx, &state,
404                                 struct pthreadpool_tevent_job_state);
405         if (req == NULL) {
406                 return NULL;
407         }
408         state->ev = ev;
409         state->req = req;
410
411         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
412
413         if (pool == NULL) {
414                 tevent_req_error(req, EINVAL);
415                 return tevent_req_post(req, ev);
416         }
417         if (pool->pool == NULL) {
418                 tevent_req_error(req, EINVAL);
419                 return tevent_req_post(req, ev);
420         }
421
422         ret = pthreadpool_tevent_register_ev(pool, ev);
423         if (tevent_req_error(req, ret)) {
424                 return tevent_req_post(req, ev);
425         }
426
427         job = talloc_zero(state, struct pthreadpool_tevent_job);
428         if (tevent_req_nomem(job, req)) {
429                 return tevent_req_post(req, ev);
430         }
431         job->pool = pool;
432         job->fn = fn;
433         job->private_data = private_data;
434         job->im = tevent_create_immediate(state->job);
435         if (tevent_req_nomem(job->im, req)) {
436                 return tevent_req_post(req, ev);
437         }
438         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
439         DLIST_ADD_END(job->pool->jobs, job);
440         job->state = state;
441         state->job = job;
442
443         ret = pthreadpool_add_job(job->pool->pool, 0,
444                                   pthreadpool_tevent_job_fn,
445                                   job);
446         if (tevent_req_error(req, ret)) {
447                 return tevent_req_post(req, ev);
448         }
449
450         return req;
451 }
452
453 static void pthreadpool_tevent_job_fn(void *private_data)
454 {
455         struct pthreadpool_tevent_job *job =
456                 talloc_get_type_abort(private_data,
457                 struct pthreadpool_tevent_job);
458
459         job->fn(job->private_data);
460 }
461
462 static int pthreadpool_tevent_job_signal(int jobid,
463                                          void (*job_fn)(void *private_data),
464                                          void *job_private_data,
465                                          void *private_data)
466 {
467         struct pthreadpool_tevent_job *job =
468                 talloc_get_type_abort(job_private_data,
469                 struct pthreadpool_tevent_job);
470         struct pthreadpool_tevent_job_state *state = job->state;
471         struct tevent_threaded_context *tctx = NULL;
472         struct pthreadpool_tevent_glue *g = NULL;
473
474         if (state == NULL) {
475                 /* Request already gone */
476                 return 0;
477         }
478
479 #ifdef HAVE_PTHREAD
480         for (g = job->pool->glue_list; g != NULL; g = g->next) {
481                 if (g->ev == state->ev) {
482                         tctx = g->tctx;
483                         break;
484                 }
485         }
486
487         if (tctx == NULL) {
488                 abort();
489         }
490 #endif
491
492         if (tctx != NULL) {
493                 /* with HAVE_PTHREAD */
494                 tevent_threaded_schedule_immediate(tctx, job->im,
495                                                    pthreadpool_tevent_job_done,
496                                                    job);
497         } else {
498                 /* without HAVE_PTHREAD */
499                 tevent_schedule_immediate(job->im, state->ev,
500                                           pthreadpool_tevent_job_done,
501                                           job);
502         }
503
504         return 0;
505 }
506
507 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
508                                         struct tevent_immediate *im,
509                                         void *private_data)
510 {
511         struct pthreadpool_tevent_job *job =
512                 talloc_get_type_abort(private_data,
513                 struct pthreadpool_tevent_job);
514         struct pthreadpool_tevent_job_state *state = job->state;
515
516         TALLOC_FREE(job->im);
517
518         if (state == NULL) {
519                 /* Request already gone */
520                 TALLOC_FREE(job);
521                 return;
522         }
523
524         /*
525          * pthreadpool_tevent_job_cleanup()
526          * will destroy the job.
527          */
528         tevent_req_done(state->req);
529 }
530
531 int pthreadpool_tevent_job_recv(struct tevent_req *req)
532 {
533         return tevent_req_simple_recv_unix(req);
534 }