pthreadpool: split out pthreadpool_tevent_job from pthreadpool_tevent_job_state
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "pthreadpool_tevent.h"
22 #include "pthreadpool.h"
23 #include "lib/util/tevent_unix.h"
24 #include "lib/util/dlinklist.h"
25
26 struct pthreadpool_tevent_job_state;
27
28 /*
29  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
30  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
31  * contexts in a pthreadpool_tevent.
32  */
33 struct pthreadpool_tevent_glue {
34         struct pthreadpool_tevent_glue *prev, *next;
35         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
36         /* Tuple we are keeping track of in this list. */
37         struct tevent_context *ev;
38         struct tevent_threaded_context *tctx;
39         /* Pointer to link object owned by *ev. */
40         struct pthreadpool_tevent_glue_ev_link *ev_link;
41 };
42
43 /*
44  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
45  * tevent context from our list of active event contexts if the event context
46  * is destroyed.
47  * This structure is talloc()'ed from the struct tevent_context *, and is a
48  * back-pointer allowing the related struct pthreadpool_tevent_glue object
49  * to be removed from the struct pthreadpool_tevent glue list if the owning
50  * tevent_context is talloc_free()'ed.
51  */
52 struct pthreadpool_tevent_glue_ev_link {
53         struct pthreadpool_tevent_glue *glue;
54 };
55
56 struct pthreadpool_tevent {
57         struct pthreadpool *pool;
58         struct pthreadpool_tevent_glue *glue_list;
59
60         struct pthreadpool_tevent_job *jobs;
61 };
62
63 struct pthreadpool_tevent_job_state {
64         struct tevent_context *ev;
65         struct tevent_req *req;
66         struct pthreadpool_tevent_job *job;
67 };
68
69 struct pthreadpool_tevent_job {
70         struct pthreadpool_tevent_job *prev, *next;
71
72         struct pthreadpool_tevent *pool;
73         struct pthreadpool_tevent_job_state *state;
74         struct tevent_immediate *im;
75
76         void (*fn)(void *private_data);
77         void *private_data;
78 };
79
80 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
81
82 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
83
84 static int pthreadpool_tevent_job_signal(int jobid,
85                                          void (*job_fn)(void *private_data),
86                                          void *job_private_data,
87                                          void *private_data);
88
89 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
90                             struct pthreadpool_tevent **presult)
91 {
92         struct pthreadpool_tevent *pool;
93         int ret;
94
95         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
96         if (pool == NULL) {
97                 return ENOMEM;
98         }
99
100         ret = pthreadpool_init(max_threads, &pool->pool,
101                                pthreadpool_tevent_job_signal, pool);
102         if (ret != 0) {
103                 TALLOC_FREE(pool);
104                 return ret;
105         }
106
107         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
108
109         *presult = pool;
110         return 0;
111 }
112
113 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
114 {
115         if (pool->pool == NULL) {
116                 return 0;
117         }
118
119         return pthreadpool_max_threads(pool->pool);
120 }
121
122 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
123 {
124         if (pool->pool == NULL) {
125                 return 0;
126         }
127
128         return pthreadpool_queued_jobs(pool->pool);
129 }
130
131 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
132 {
133         struct pthreadpool_tevent_job *job = NULL;
134         struct pthreadpool_tevent_job *njob = NULL;
135         struct pthreadpool_tevent_glue *glue = NULL;
136         int ret;
137
138         ret = pthreadpool_destroy(pool->pool);
139         if (ret != 0) {
140                 return ret;
141         }
142         pool->pool = NULL;
143
144         for (job = pool->jobs; job != NULL; job = njob) {
145                 njob = job->next;
146
147                 /* The job this removes it from the list */
148                 pthreadpool_tevent_job_orphan(job);
149         }
150
151         /*
152          * Delete all the registered
153          * tevent_context/tevent_threaded_context
154          * pairs.
155          */
156         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
157                 /* The glue destructor removes it from the list */
158                 TALLOC_FREE(glue);
159         }
160         pool->glue_list = NULL;
161
162         return 0;
163 }
164
165 static int pthreadpool_tevent_glue_destructor(
166         struct pthreadpool_tevent_glue *glue)
167 {
168         if (glue->pool->glue_list != NULL) {
169                 DLIST_REMOVE(glue->pool->glue_list, glue);
170         }
171
172         /* Ensure the ev_link destructor knows we're gone */
173         glue->ev_link->glue = NULL;
174
175         TALLOC_FREE(glue->ev_link);
176         TALLOC_FREE(glue->tctx);
177
178         return 0;
179 }
180
181 /*
182  * Destructor called either explicitly from
183  * pthreadpool_tevent_glue_destructor(), or indirectly
184  * when owning tevent_context is destroyed.
185  *
186  * When called from pthreadpool_tevent_glue_destructor()
187  * ev_link->glue is already NULL, so this does nothing.
188  *
189  * When called from talloc_free() of the owning
190  * tevent_context we must ensure we also remove the
191  * linked glue object from the list inside
192  * struct pthreadpool_tevent.
193  */
194 static int pthreadpool_tevent_glue_link_destructor(
195         struct pthreadpool_tevent_glue_ev_link *ev_link)
196 {
197         TALLOC_FREE(ev_link->glue);
198         return 0;
199 }
200
201 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
202                                           struct tevent_context *ev)
203 {
204         struct pthreadpool_tevent_glue *glue = NULL;
205         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
206
207         /*
208          * See if this tevent_context was already registered by
209          * searching the glue object list. If so we have nothing
210          * to do here - we already have a tevent_context/tevent_threaded_context
211          * pair.
212          */
213         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
214                 if (glue->ev == ev) {
215                         return 0;
216                 }
217         }
218
219         /*
220          * Event context not yet registered - create a new glue
221          * object containing a tevent_context/tevent_threaded_context
222          * pair and put it on the list to remember this registration.
223          * We also need a link object to ensure the event context
224          * can't go away without us knowing about it.
225          */
226         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
227         if (glue == NULL) {
228                 return ENOMEM;
229         }
230         *glue = (struct pthreadpool_tevent_glue) {
231                 .pool = pool,
232                 .ev = ev,
233         };
234         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
235
236         /*
237          * Now allocate the link object to the event context. Note this
238          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
239          * context is freed we are able to cleanup the glue object
240          * in the link object destructor.
241          */
242
243         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
244         if (ev_link == NULL) {
245                 TALLOC_FREE(glue);
246                 return ENOMEM;
247         }
248         ev_link->glue = glue;
249         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
250
251         glue->ev_link = ev_link;
252
253 #ifdef HAVE_PTHREAD
254         glue->tctx = tevent_threaded_context_create(pool, ev);
255         if (glue->tctx == NULL) {
256                 TALLOC_FREE(ev_link);
257                 TALLOC_FREE(glue);
258                 return ENOMEM;
259         }
260 #endif
261
262         DLIST_ADD(pool->glue_list, glue);
263         return 0;
264 }
265
266 static void pthreadpool_tevent_job_fn(void *private_data);
267 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
268                                         struct tevent_immediate *im,
269                                         void *private_data);
270
271 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
272 {
273         /*
274          * We should never be called with state->state != NULL.
275          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
276          * after detaching from the request state and pool list.
277          */
278         if (job->state != NULL) {
279                 abort();
280         }
281
282         /*
283          * If the job is not finished (job->im still there)
284          * and it's still attached to the pool,
285          * we try to cancel it (before it was starts)
286          */
287         if (job->im != NULL && job->pool != NULL) {
288                 size_t num;
289
290                 num = pthreadpool_cancel_job(job->pool->pool, 0,
291                                              pthreadpool_tevent_job_fn,
292                                              job);
293                 if (num != 0) {
294                         /*
295                          * It was not too late to cancel the request.
296                          *
297                          * We can remove job->im, as it will never be used.
298                          */
299                         TALLOC_FREE(job->im);
300                 }
301         }
302
303         /*
304          * pthreadpool_tevent_job_orphan() already removed
305          * it from pool->jobs. And we don't need try
306          * pthreadpool_cancel_job() again.
307          */
308         job->pool = NULL;
309
310         if (job->im != NULL) {
311                 /*
312                  * state->im still there means, we need to wait for the
313                  * immediate event to be triggered or just leak the memory.
314                  */
315                 return -1;
316         }
317
318         return 0;
319 }
320
321 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
322 {
323         /*
324          * We're the only function that sets
325          * job->state = NULL;
326          */
327         if (job->state == NULL) {
328                 abort();
329         }
330
331         /*
332          * We need to reparent to a long term context.
333          * And detach from the request state.
334          * Maybe the destructor will keep the memory
335          * and leak it for now.
336          */
337         (void)talloc_reparent(job->state, NULL, job);
338         job->state->job = NULL;
339         job->state = NULL;
340
341         /*
342          * job->pool will only be set to NULL
343          * in the first destructur run.
344          */
345         if (job->pool == NULL) {
346                 abort();
347         }
348
349         /*
350          * Dettach it from the pool.
351          *
352          * The job might still be running,
353          * so we keep job->pool.
354          * The destructor will set it to NULL
355          * after trying pthreadpool_cancel_job()
356          */
357         DLIST_REMOVE(job->pool->jobs, job);
358
359         TALLOC_FREE(job);
360 }
361
362 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
363                                            enum tevent_req_state req_state)
364 {
365         struct pthreadpool_tevent_job_state *state =
366                 tevent_req_data(req,
367                 struct pthreadpool_tevent_job_state);
368
369         if (state->job == NULL) {
370                 /*
371                  * The job request is not scheduled in the pool
372                  * yet or anymore.
373                  */
374                 return;
375         }
376
377         /*
378          * We need to reparent to a long term context.
379          * Maybe the destructor will keep the memory
380          * and leak it for now.
381          */
382         pthreadpool_tevent_job_orphan(state->job);
383         state->job = NULL; /* not needed but looks better */
384         return;
385 }
386
387 struct tevent_req *pthreadpool_tevent_job_send(
388         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
389         struct pthreadpool_tevent *pool,
390         void (*fn)(void *private_data), void *private_data)
391 {
392         struct tevent_req *req = NULL;
393         struct pthreadpool_tevent_job_state *state = NULL;
394         struct pthreadpool_tevent_job *job = NULL;
395         int ret;
396
397         req = tevent_req_create(mem_ctx, &state,
398                                 struct pthreadpool_tevent_job_state);
399         if (req == NULL) {
400                 return NULL;
401         }
402         state->ev = ev;
403         state->req = req;
404
405         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
406
407         if (pool == NULL) {
408                 tevent_req_error(req, EINVAL);
409                 return tevent_req_post(req, ev);
410         }
411         if (pool->pool == NULL) {
412                 tevent_req_error(req, EINVAL);
413                 return tevent_req_post(req, ev);
414         }
415
416         ret = pthreadpool_tevent_register_ev(pool, ev);
417         if (tevent_req_error(req, ret)) {
418                 return tevent_req_post(req, ev);
419         }
420
421         job = talloc_zero(state, struct pthreadpool_tevent_job);
422         if (tevent_req_nomem(job, req)) {
423                 return tevent_req_post(req, ev);
424         }
425         job->pool = pool;
426         job->fn = fn;
427         job->private_data = private_data;
428         job->im = tevent_create_immediate(state->job);
429         if (tevent_req_nomem(job->im, req)) {
430                 return tevent_req_post(req, ev);
431         }
432         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
433         DLIST_ADD_END(job->pool->jobs, job);
434         job->state = state;
435         state->job = job;
436
437         ret = pthreadpool_add_job(job->pool->pool, 0,
438                                   pthreadpool_tevent_job_fn,
439                                   job);
440         if (tevent_req_error(req, ret)) {
441                 return tevent_req_post(req, ev);
442         }
443
444         return req;
445 }
446
447 static void pthreadpool_tevent_job_fn(void *private_data)
448 {
449         struct pthreadpool_tevent_job *job =
450                 talloc_get_type_abort(private_data,
451                 struct pthreadpool_tevent_job);
452
453         job->fn(job->private_data);
454 }
455
456 static int pthreadpool_tevent_job_signal(int jobid,
457                                          void (*job_fn)(void *private_data),
458                                          void *job_private_data,
459                                          void *private_data)
460 {
461         struct pthreadpool_tevent_job *job =
462                 talloc_get_type_abort(job_private_data,
463                 struct pthreadpool_tevent_job);
464         struct pthreadpool_tevent_job_state *state = job->state;
465         struct tevent_threaded_context *tctx = NULL;
466         struct pthreadpool_tevent_glue *g = NULL;
467
468         if (state == NULL) {
469                 /* Request already gone */
470                 return 0;
471         }
472
473 #ifdef HAVE_PTHREAD
474         for (g = job->pool->glue_list; g != NULL; g = g->next) {
475                 if (g->ev == state->ev) {
476                         tctx = g->tctx;
477                         break;
478                 }
479         }
480
481         if (tctx == NULL) {
482                 abort();
483         }
484 #endif
485
486         if (tctx != NULL) {
487                 /* with HAVE_PTHREAD */
488                 tevent_threaded_schedule_immediate(tctx, job->im,
489                                                    pthreadpool_tevent_job_done,
490                                                    job);
491         } else {
492                 /* without HAVE_PTHREAD */
493                 tevent_schedule_immediate(job->im, state->ev,
494                                           pthreadpool_tevent_job_done,
495                                           job);
496         }
497
498         return 0;
499 }
500
501 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
502                                         struct tevent_immediate *im,
503                                         void *private_data)
504 {
505         struct pthreadpool_tevent_job *job =
506                 talloc_get_type_abort(private_data,
507                 struct pthreadpool_tevent_job);
508         struct pthreadpool_tevent_job_state *state = job->state;
509
510         TALLOC_FREE(job->im);
511
512         if (state == NULL) {
513                 /* Request already gone */
514                 TALLOC_FREE(job);
515                 return;
516         }
517
518         /*
519          * pthreadpool_tevent_job_cleanup()
520          * will destroy the job.
521          */
522         tevent_req_done(state->req);
523 }
524
525 int pthreadpool_tevent_job_recv(struct tevent_req *req)
526 {
527         return tevent_req_simple_recv_unix(req);
528 }