tevent: Add threaded immediate activation
[sfrench/samba-autobuild/.git] / lib / tevent / tevent_threads.c
1 /*
2    tevent event library.
3
4    Copyright (C) Jeremy Allison 2015
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30
31 #if defined(HAVE_PTHREAD)
32 #include <pthread.h>
33
34 struct tevent_immediate_list {
35         struct tevent_immediate_list *next, *prev;
36         tevent_immediate_handler_t handler;
37         struct tevent_immediate *im;
38         void *private_ptr;
39 };
40
41 struct tevent_thread_proxy {
42         pthread_mutex_t mutex;
43         struct tevent_context *dest_ev_ctx;
44         int read_fd;
45         int write_fd;
46         struct tevent_fd *pipe_read_fde;
47         /* Pending events list. */
48         struct tevent_immediate_list *im_list;
49         /* Completed events list. */
50         struct tevent_immediate_list *tofree_im_list;
51         struct tevent_immediate *free_im;
52 };
53
54 static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 {
56         struct tevent_immediate_list *im_entry = NULL;
57         struct tevent_immediate_list *im_next = NULL;
58
59         for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60                 im_next = im_entry->next;
61                 DLIST_REMOVE(*pp_list_head, im_entry);
62                 TALLOC_FREE(im_entry);
63         }
64 }
65
66 static void free_list_handler(struct tevent_context *ev,
67                                 struct tevent_immediate *im,
68                                 void *private_ptr)
69 {
70         struct tevent_thread_proxy *tp =
71                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72         int ret;
73
74         ret = pthread_mutex_lock(&tp->mutex);
75         if (ret != 0) {
76                 abort();
77                 /* Notreached. */
78                 return;
79         }
80
81         free_im_list(&tp->tofree_im_list);
82
83         ret = pthread_mutex_unlock(&tp->mutex);
84         if (ret != 0) {
85                 abort();
86                 /* Notreached. */
87                 return;
88         }
89 }
90
91 static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 {
93         struct tevent_immediate_list *im_entry = NULL;
94         struct tevent_immediate_list *im_next = NULL;
95
96         for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97                 im_next = im_entry->next;
98                 DLIST_REMOVE(tp->im_list, im_entry);
99
100                 tevent_schedule_immediate(im_entry->im,
101                                         tp->dest_ev_ctx,
102                                         im_entry->handler,
103                                         im_entry->private_ptr);
104
105                 /* Move from pending list to free list. */
106                 DLIST_ADD(tp->tofree_im_list, im_entry);
107         }
108         if (tp->tofree_im_list != NULL) {
109                 /*
110                  * Once the current immediate events
111                  * are processed, we need to reschedule
112                  * ourselves to free them. This works
113                  * as tevent_schedule_immediate()
114                  * always adds events to the *END* of
115                  * the immediate events list.
116                  */
117                 tevent_schedule_immediate(tp->free_im,
118                                         tp->dest_ev_ctx,
119                                         free_list_handler,
120                                         tp);
121         }
122 }
123
124 static void pipe_read_handler(struct tevent_context *ev,
125                                 struct tevent_fd *fde,
126                                 uint16_t flags,
127                                 void *private_ptr)
128 {
129         struct tevent_thread_proxy *tp =
130                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131         ssize_t len = 64;
132         int ret;
133
134         ret = pthread_mutex_lock(&tp->mutex);
135         if (ret != 0) {
136                 abort();
137                 /* Notreached. */
138                 return;
139         }
140
141         /*
142          * Clear out all data in the pipe. We
143          * don't really care if this returns -1.
144          */
145         while (len == 64) {
146                 char buf[64];
147                 len = read(tp->read_fd, buf, 64);
148         };
149
150         schedule_immediate_functions(tp);
151
152         ret = pthread_mutex_unlock(&tp->mutex);
153         if (ret != 0) {
154                 abort();
155                 /* Notreached. */
156                 return;
157         }
158 }
159
160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 {
162         int ret;
163
164         ret = pthread_mutex_lock(&tp->mutex);
165         if (ret != 0) {
166                 abort();
167                 /* Notreached. */
168                 return 0;
169         }
170
171         TALLOC_FREE(tp->pipe_read_fde);
172
173         if (tp->read_fd != -1) {
174                 (void)close(tp->read_fd);
175                 tp->read_fd = -1;
176         }
177         if (tp->write_fd != -1) {
178                 (void)close(tp->write_fd);
179                 tp->write_fd = -1;
180         }
181
182         /* Hmmm. It's probably an error if we get here with
183            any non-NULL immediate entries.. */
184
185         free_im_list(&tp->im_list);
186         free_im_list(&tp->tofree_im_list);
187
188         TALLOC_FREE(tp->free_im);
189
190         ret = pthread_mutex_unlock(&tp->mutex);
191         if (ret != 0) {
192                 abort();
193                 /* Notreached. */
194                 return 0;
195         }
196
197         ret = pthread_mutex_destroy(&tp->mutex);
198         if (ret != 0) {
199                 abort();
200                 /* Notreached. */
201                 return 0;
202         }
203
204         return 0;
205 }
206
207 /*
208  * Create a struct that can be passed to other threads
209  * to allow them to signal the struct tevent_context *
210  * passed in.
211  */
212
213 struct tevent_thread_proxy *tevent_thread_proxy_create(
214                 struct tevent_context *dest_ev_ctx)
215 {
216         int ret;
217         int pipefds[2];
218         struct tevent_thread_proxy *tp;
219
220         tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
221         if (tp == NULL) {
222                 return NULL;
223         }
224
225         ret = pthread_mutex_init(&tp->mutex, NULL);
226         if (ret != 0) {
227                 goto fail;
228         }
229
230         tp->dest_ev_ctx = dest_ev_ctx;
231         tp->read_fd = -1;
232         tp->write_fd = -1;
233
234         talloc_set_destructor(tp, tevent_thread_proxy_destructor);
235
236         ret = pipe(pipefds);
237         if (ret == -1) {
238                 goto fail;
239         }
240
241         tp->read_fd = pipefds[0];
242         tp->write_fd = pipefds[1];
243
244         ret = ev_set_blocking(pipefds[0], false);
245         if (ret != 0) {
246                 goto fail;
247         }
248         ret = ev_set_blocking(pipefds[1], false);
249         if (ret != 0) {
250                 goto fail;
251         }
252         if (!ev_set_close_on_exec(pipefds[0])) {
253                 goto fail;
254         }
255         if (!ev_set_close_on_exec(pipefds[1])) {
256                 goto fail;
257         }
258
259         tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
260                                 tp,
261                                 tp->read_fd,
262                                 TEVENT_FD_READ,
263                                 pipe_read_handler,
264                                 tp);
265         if (tp->pipe_read_fde == NULL) {
266                 goto fail;
267         }
268
269         /*
270          * Create an immediate event to free
271          * completed lists.
272          */
273         tp->free_im = tevent_create_immediate(tp);
274         if (tp->free_im == NULL) {
275                 goto fail;
276         }
277
278         return tp;
279
280   fail:
281
282         TALLOC_FREE(tp);
283         return NULL;
284 }
285
286 /*
287  * This function schedules an immediate event to be called with argument
288  * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
289  * wait for activation to take place, this is simply fire-and-forget.
290  *
291  * pp_im must be a pointer to an immediate event talloced on
292  * a context owned by the calling thread, or the NULL context.
293  * Ownership of *pp_im will be transfered to the tevent library.
294  *
295  * pp_private can be null, or contents of *pp_private must be
296  * talloc'ed memory on a context owned by the calling thread
297  * or the NULL context. If non-null, ownership of *pp_private will
298  * be transfered to the tevent library.
299  *
300  * If you want to return a message, have the destination use the
301  * same function call to send back to the caller.
302  */
303
304
305 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
306                                   struct tevent_immediate **pp_im,
307                                   tevent_immediate_handler_t handler,
308                                   void *pp_private_data)
309 {
310         struct tevent_immediate_list *im_entry;
311         int ret;
312         char c;
313         ssize_t written;
314
315         ret = pthread_mutex_lock(&tp->mutex);
316         if (ret != 0) {
317                 abort();
318                 /* Notreached. */
319                 return;
320         }
321
322         if (tp->write_fd == -1) {
323                 /* In the process of being destroyed. Ignore. */
324                 goto end;
325         }
326
327         /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
328         im_entry = talloc_zero(NULL, struct tevent_immediate_list);
329         if (im_entry == NULL) {
330                 goto end;
331         }
332
333         im_entry->handler = handler;
334         im_entry->im = talloc_move(im_entry, pp_im);
335
336         if (pp_private_data != NULL) {
337                 void **pptr = (void **)pp_private_data;
338                 im_entry->private_ptr = talloc_move(im_entry, pptr);
339         }
340
341         DLIST_ADD(tp->im_list, im_entry);
342
343         /* And notify the dest_ev_ctx to wake up. */
344         c = '\0';
345         do {
346                 written = write(tp->write_fd, &c, 1);
347         } while (written == -1 && errno == EINTR);
348
349   end:
350
351         ret = pthread_mutex_unlock(&tp->mutex);
352         if (ret != 0) {
353                 abort();
354                 /* Notreached. */
355         }
356 }
357 #else
358 /* !HAVE_PTHREAD */
359 struct tevent_thread_proxy *tevent_thread_proxy_create(
360                 struct tevent_context *dest_ev_ctx)
361 {
362         errno = ENOSYS;
363         return NULL;
364 }
365
366 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
367                                   struct tevent_immediate **pp_im,
368                                   tevent_immediate_handler_t handler,
369                                   void *pp_private_data)
370 {
371         ;
372 }
373 #endif
374
375 static int tevent_threaded_context_destructor(
376         struct tevent_threaded_context *tctx)
377 {
378         if (tctx->event_ctx != NULL) {
379                 DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
380         }
381         return 0;
382 }
383
384 struct tevent_threaded_context *tevent_threaded_context_create(
385         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
386 {
387 #ifdef HAVE_PTHREAD
388         struct tevent_threaded_context *tctx;
389         int ret;
390
391         ret = tevent_common_wakeup_init(ev);
392         if (ret != 0) {
393                 errno = ret;
394                 return NULL;
395         }
396
397         tctx = talloc(mem_ctx, struct tevent_threaded_context);
398         if (tctx == NULL) {
399                 return NULL;
400         }
401         tctx->event_ctx = ev;
402
403         DLIST_ADD(ev->threaded_contexts, tctx);
404         talloc_set_destructor(tctx, tevent_threaded_context_destructor);
405
406         return tctx;
407 #else
408         errno = ENOSYS;
409         return NULL;
410 #endif
411 }
412
413 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
414                                          struct tevent_immediate *im,
415                                          tevent_immediate_handler_t handler,
416                                          void *private_data,
417                                          const char *handler_name,
418                                          const char *location)
419 {
420 #ifdef HAVE_PTHREAD
421         struct tevent_context *ev = tctx->event_ctx;
422         int ret;
423
424         if ((im->event_ctx != NULL) || (handler == NULL)) {
425                 abort();
426         }
427
428         im->event_ctx           = ev;
429         im->handler             = handler;
430         im->private_data        = private_data;
431         im->handler_name        = handler_name;
432         im->schedule_location   = location;
433         im->cancel_fn           = NULL;
434         im->additional_data     = NULL;
435
436         ret = pthread_mutex_lock(&ev->scheduled_mutex);
437         if (ret != 0) {
438                 abort();
439         }
440
441         DLIST_ADD_END(ev->scheduled_immediates, im);
442
443         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
444         if (ret != 0) {
445                 abort();
446         }
447
448         /*
449          * We might want to wake up the main thread under the lock. We
450          * had a slightly similar situation in pthreadpool, changed
451          * with 1c4284c7395f23. This is not exactly the same, as the
452          * wakeup is only a last-resort thing in case the main thread
453          * is sleeping. Doing the wakeup under the lock can easily
454          * lead to a contended mutex, which is much more expensive
455          * than a noncontended one. So I'd opt for the lower footprint
456          * initially. Maybe we have to change that later.
457          */
458         tevent_common_wakeup(ev);
459 #else
460         /*
461          * tevent_threaded_context_create() returned NULL with ENOSYS...
462          */
463         abort();
464 #endif
465 }
466
467 void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
468 {
469 #ifdef HAVE_PTHREAD
470         int ret;
471         ret = pthread_mutex_lock(&ev->scheduled_mutex);
472         if (ret != 0) {
473                 abort();
474         }
475
476         while (ev->scheduled_immediates != NULL) {
477                 struct tevent_immediate *im = ev->scheduled_immediates;
478                 DLIST_REMOVE(ev->scheduled_immediates, im);
479                 DLIST_ADD_END(ev->immediate_events, im);
480         }
481
482         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
483         if (ret != 0) {
484                 abort();
485         }
486 #else
487         /*
488          * tevent_threaded_context_create() returned NULL with ENOSYS...
489          */
490         abort();
491 #endif
492 }