tevent: Fix a race condition
[samba.git] / lib / tevent / tevent_threads.c
1 /*
2    tevent event library.
3
4    Copyright (C) Jeremy Allison 2015
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30
31 #if defined(HAVE_PTHREAD)
32 #include <pthread.h>
33
34 struct tevent_immediate_list {
35         struct tevent_immediate_list *next, *prev;
36         tevent_immediate_handler_t handler;
37         struct tevent_immediate *im;
38         void *private_ptr;
39 };
40
41 struct tevent_thread_proxy {
42         pthread_mutex_t mutex;
43         struct tevent_context *dest_ev_ctx;
44         int read_fd;
45         int write_fd;
46         struct tevent_fd *pipe_read_fde;
47         /* Pending events list. */
48         struct tevent_immediate_list *im_list;
49         /* Completed events list. */
50         struct tevent_immediate_list *tofree_im_list;
51         struct tevent_immediate *free_im;
52 };
53
54 static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 {
56         struct tevent_immediate_list *im_entry = NULL;
57         struct tevent_immediate_list *im_next = NULL;
58
59         for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60                 im_next = im_entry->next;
61                 DLIST_REMOVE(*pp_list_head, im_entry);
62                 TALLOC_FREE(im_entry);
63         }
64 }
65
66 static void free_list_handler(struct tevent_context *ev,
67                                 struct tevent_immediate *im,
68                                 void *private_ptr)
69 {
70         struct tevent_thread_proxy *tp =
71                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72         int ret;
73
74         ret = pthread_mutex_lock(&tp->mutex);
75         if (ret != 0) {
76                 abort();
77                 /* Notreached. */
78                 return;
79         }
80
81         free_im_list(&tp->tofree_im_list);
82
83         ret = pthread_mutex_unlock(&tp->mutex);
84         if (ret != 0) {
85                 abort();
86                 /* Notreached. */
87                 return;
88         }
89 }
90
91 static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 {
93         struct tevent_immediate_list *im_entry = NULL;
94         struct tevent_immediate_list *im_next = NULL;
95
96         for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97                 im_next = im_entry->next;
98                 DLIST_REMOVE(tp->im_list, im_entry);
99
100                 tevent_schedule_immediate(im_entry->im,
101                                         tp->dest_ev_ctx,
102                                         im_entry->handler,
103                                         im_entry->private_ptr);
104
105                 /* Move from pending list to free list. */
106                 DLIST_ADD(tp->tofree_im_list, im_entry);
107         }
108         if (tp->tofree_im_list != NULL) {
109                 /*
110                  * Once the current immediate events
111                  * are processed, we need to reschedule
112                  * ourselves to free them. This works
113                  * as tevent_schedule_immediate()
114                  * always adds events to the *END* of
115                  * the immediate events list.
116                  */
117                 tevent_schedule_immediate(tp->free_im,
118                                         tp->dest_ev_ctx,
119                                         free_list_handler,
120                                         tp);
121         }
122 }
123
124 static void pipe_read_handler(struct tevent_context *ev,
125                                 struct tevent_fd *fde,
126                                 uint16_t flags,
127                                 void *private_ptr)
128 {
129         struct tevent_thread_proxy *tp =
130                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131         ssize_t len = 64;
132         int ret;
133
134         ret = pthread_mutex_lock(&tp->mutex);
135         if (ret != 0) {
136                 abort();
137                 /* Notreached. */
138                 return;
139         }
140
141         /*
142          * Clear out all data in the pipe. We
143          * don't really care if this returns -1.
144          */
145         while (len == 64) {
146                 char buf[64];
147                 len = read(tp->read_fd, buf, 64);
148         };
149
150         schedule_immediate_functions(tp);
151
152         ret = pthread_mutex_unlock(&tp->mutex);
153         if (ret != 0) {
154                 abort();
155                 /* Notreached. */
156                 return;
157         }
158 }
159
160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 {
162         int ret;
163
164         ret = pthread_mutex_lock(&tp->mutex);
165         if (ret != 0) {
166                 abort();
167                 /* Notreached. */
168                 return 0;
169         }
170
171         TALLOC_FREE(tp->pipe_read_fde);
172
173         if (tp->read_fd != -1) {
174                 (void)close(tp->read_fd);
175                 tp->read_fd = -1;
176         }
177         if (tp->write_fd != -1) {
178                 (void)close(tp->write_fd);
179                 tp->write_fd = -1;
180         }
181
182         /* Hmmm. It's probably an error if we get here with
183            any non-NULL immediate entries.. */
184
185         free_im_list(&tp->im_list);
186         free_im_list(&tp->tofree_im_list);
187
188         TALLOC_FREE(tp->free_im);
189
190         ret = pthread_mutex_unlock(&tp->mutex);
191         if (ret != 0) {
192                 abort();
193                 /* Notreached. */
194                 return 0;
195         }
196
197         ret = pthread_mutex_destroy(&tp->mutex);
198         if (ret != 0) {
199                 abort();
200                 /* Notreached. */
201                 return 0;
202         }
203
204         return 0;
205 }
206
207 /*
208  * Create a struct that can be passed to other threads
209  * to allow them to signal the struct tevent_context *
210  * passed in.
211  */
212
213 struct tevent_thread_proxy *tevent_thread_proxy_create(
214                 struct tevent_context *dest_ev_ctx)
215 {
216         int ret;
217         int pipefds[2];
218         struct tevent_thread_proxy *tp;
219
220         tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
221         if (tp == NULL) {
222                 return NULL;
223         }
224
225         ret = pthread_mutex_init(&tp->mutex, NULL);
226         if (ret != 0) {
227                 goto fail;
228         }
229
230         tp->dest_ev_ctx = dest_ev_ctx;
231         tp->read_fd = -1;
232         tp->write_fd = -1;
233
234         talloc_set_destructor(tp, tevent_thread_proxy_destructor);
235
236         ret = pipe(pipefds);
237         if (ret == -1) {
238                 goto fail;
239         }
240
241         tp->read_fd = pipefds[0];
242         tp->write_fd = pipefds[1];
243
244         ret = ev_set_blocking(pipefds[0], false);
245         if (ret != 0) {
246                 goto fail;
247         }
248         ret = ev_set_blocking(pipefds[1], false);
249         if (ret != 0) {
250                 goto fail;
251         }
252         if (!ev_set_close_on_exec(pipefds[0])) {
253                 goto fail;
254         }
255         if (!ev_set_close_on_exec(pipefds[1])) {
256                 goto fail;
257         }
258
259         tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
260                                 tp,
261                                 tp->read_fd,
262                                 TEVENT_FD_READ,
263                                 pipe_read_handler,
264                                 tp);
265         if (tp->pipe_read_fde == NULL) {
266                 goto fail;
267         }
268
269         /*
270          * Create an immediate event to free
271          * completed lists.
272          */
273         tp->free_im = tevent_create_immediate(tp);
274         if (tp->free_im == NULL) {
275                 goto fail;
276         }
277
278         return tp;
279
280   fail:
281
282         TALLOC_FREE(tp);
283         return NULL;
284 }
285
286 /*
287  * This function schedules an immediate event to be called with argument
288  * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
289  * wait for activation to take place, this is simply fire-and-forget.
290  *
291  * pp_im must be a pointer to an immediate event talloced on
292  * a context owned by the calling thread, or the NULL context.
293  * Ownership of *pp_im will be transfered to the tevent library.
294  *
295  * pp_private can be null, or contents of *pp_private must be
296  * talloc'ed memory on a context owned by the calling thread
297  * or the NULL context. If non-null, ownership of *pp_private will
298  * be transfered to the tevent library.
299  *
300  * If you want to return a message, have the destination use the
301  * same function call to send back to the caller.
302  */
303
304
305 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
306                                   struct tevent_immediate **pp_im,
307                                   tevent_immediate_handler_t handler,
308                                   void *pp_private_data)
309 {
310         struct tevent_immediate_list *im_entry;
311         int ret;
312         char c;
313         ssize_t written;
314
315         ret = pthread_mutex_lock(&tp->mutex);
316         if (ret != 0) {
317                 abort();
318                 /* Notreached. */
319                 return;
320         }
321
322         if (tp->write_fd == -1) {
323                 /* In the process of being destroyed. Ignore. */
324                 goto end;
325         }
326
327         /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
328         im_entry = talloc_zero(NULL, struct tevent_immediate_list);
329         if (im_entry == NULL) {
330                 goto end;
331         }
332
333         im_entry->handler = handler;
334         im_entry->im = talloc_move(im_entry, pp_im);
335
336         if (pp_private_data != NULL) {
337                 void **pptr = (void **)pp_private_data;
338                 im_entry->private_ptr = talloc_move(im_entry, pptr);
339         }
340
341         DLIST_ADD(tp->im_list, im_entry);
342
343         /* And notify the dest_ev_ctx to wake up. */
344         c = '\0';
345         do {
346                 written = write(tp->write_fd, &c, 1);
347         } while (written == -1 && errno == EINTR);
348
349   end:
350
351         ret = pthread_mutex_unlock(&tp->mutex);
352         if (ret != 0) {
353                 abort();
354                 /* Notreached. */
355         }
356 }
357 #else
358 /* !HAVE_PTHREAD */
359 struct tevent_thread_proxy *tevent_thread_proxy_create(
360                 struct tevent_context *dest_ev_ctx)
361 {
362         errno = ENOSYS;
363         return NULL;
364 }
365
366 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
367                                   struct tevent_immediate **pp_im,
368                                   tevent_immediate_handler_t handler,
369                                   void *pp_private_data)
370 {
371         ;
372 }
373 #endif
374
375 static int tevent_threaded_context_destructor(
376         struct tevent_threaded_context *tctx)
377 {
378         int ret;
379
380         if (tctx->event_ctx != NULL) {
381                 DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
382         }
383
384         /*
385          * We have to coordinate with _tevent_threaded_schedule_immediate's
386          * unlock of the event_ctx_mutex. We're in the main thread here,
387          * and we can be scheduled before the helper thread finalizes its
388          * call _tevent_threaded_schedule_immediate. This means we would
389          * pthreadpool_destroy a locked mutex, which is illegal.
390          */
391         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
392         if (ret != 0) {
393                 abort();
394         }
395
396         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
397         if (ret != 0) {
398                 abort();
399         }
400
401         ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
402         if (ret != 0) {
403                 abort();
404         }
405
406         return 0;
407 }
408
409 struct tevent_threaded_context *tevent_threaded_context_create(
410         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
411 {
412 #ifdef HAVE_PTHREAD
413         struct tevent_threaded_context *tctx;
414         int ret;
415
416         ret = tevent_common_wakeup_init(ev);
417         if (ret != 0) {
418                 errno = ret;
419                 return NULL;
420         }
421
422         tctx = talloc(mem_ctx, struct tevent_threaded_context);
423         if (tctx == NULL) {
424                 return NULL;
425         }
426         tctx->event_ctx = ev;
427         tctx->wakeup_fd = ev->wakeup_fd;
428
429         ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
430         if (ret != 0) {
431                 TALLOC_FREE(tctx);
432                 return NULL;
433         }
434
435         DLIST_ADD(ev->threaded_contexts, tctx);
436         talloc_set_destructor(tctx, tevent_threaded_context_destructor);
437
438         return tctx;
439 #else
440         errno = ENOSYS;
441         return NULL;
442 #endif
443 }
444
445 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
446                                          struct tevent_immediate *im,
447                                          tevent_immediate_handler_t handler,
448                                          void *private_data,
449                                          const char *handler_name,
450                                          const char *location)
451 {
452 #ifdef HAVE_PTHREAD
453         struct tevent_context *ev;
454         int ret, wakeup_fd;
455
456         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
457         if (ret != 0) {
458                 abort();
459         }
460
461         ev = tctx->event_ctx;
462
463         if (ev == NULL) {
464                 /*
465                  * Our event context is already gone.
466                  */
467                 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
468                 if (ret != 0) {
469                         abort();
470                 }
471                 return;
472         }
473
474         if ((im->event_ctx != NULL) || (handler == NULL)) {
475                 abort();
476         }
477
478         im->event_ctx           = ev;
479         im->handler             = handler;
480         im->private_data        = private_data;
481         im->handler_name        = handler_name;
482         im->schedule_location   = location;
483         im->cancel_fn           = NULL;
484         im->additional_data     = NULL;
485
486         ret = pthread_mutex_lock(&ev->scheduled_mutex);
487         if (ret != 0) {
488                 abort();
489         }
490
491         DLIST_ADD_END(ev->scheduled_immediates, im);
492
493         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
494         if (ret != 0) {
495                 abort();
496         }
497
498         wakeup_fd = tctx->wakeup_fd;
499
500         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
501         if (ret != 0) {
502                 abort();
503         }
504
505         /*
506          * We might want to wake up the main thread under the lock. We
507          * had a slightly similar situation in pthreadpool, changed
508          * with 1c4284c7395f23. This is not exactly the same, as the
509          * wakeup is only a last-resort thing in case the main thread
510          * is sleeping. Doing the wakeup under the lock can easily
511          * lead to a contended mutex, which is much more expensive
512          * than a noncontended one. So I'd opt for the lower footprint
513          * initially. Maybe we have to change that later.
514          */
515         tevent_common_wakeup_fd(wakeup_fd);
516 #else
517         /*
518          * tevent_threaded_context_create() returned NULL with ENOSYS...
519          */
520         abort();
521 #endif
522 }
523
524 void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
525 {
526 #ifdef HAVE_PTHREAD
527         int ret;
528         ret = pthread_mutex_lock(&ev->scheduled_mutex);
529         if (ret != 0) {
530                 abort();
531         }
532
533         while (ev->scheduled_immediates != NULL) {
534                 struct tevent_immediate *im = ev->scheduled_immediates;
535                 DLIST_REMOVE(ev->scheduled_immediates, im);
536                 DLIST_ADD_END(ev->immediate_events, im);
537         }
538
539         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
540         if (ret != 0) {
541                 abort();
542         }
543 #else
544         /*
545          * tevent_threaded_context_create() returned NULL with ENOSYS...
546          */
547         abort();
548 #endif
549 }