b0f58efd0938bb33512860db0efa224887bf2111
[vlendec/samba-autobuild/.git] / lib / tevent / testsuite.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    testing of the events subsystem
5
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #include "tevent.h"
29 #include "system/filesys.h"
30 #include "system/select.h"
31 #include "system/network.h"
32 #include "torture/torture.h"
33 #include "torture/local/proto.h"
34 #ifdef HAVE_PTHREAD
35 #include "system/threads.h"
36 #include <assert.h>
37 #endif
38
39 static int fde_count;
40
41 static void do_read(int fd, void *buf, size_t count)
42 {
43         ssize_t ret;
44
45         do {
46                 ret = read(fd, buf, count);
47         } while (ret == -1 && errno == EINTR);
48 }
49
50 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
51                         uint16_t flags, void *private_data)
52 {
53         int *fd = (int *)private_data;
54         char c;
55 #ifdef SA_SIGINFO
56         kill(getpid(), SIGUSR1);
57 #endif
58         kill(getpid(), SIGALRM);
59
60         do_read(fd[0], &c, 1);
61         fde_count++;
62 }
63
64 static void do_write(int fd, void *buf, size_t count)
65 {
66         ssize_t ret;
67
68         do {
69                 ret = write(fd, buf, count);
70         } while (ret == -1 && errno == EINTR);
71 }
72
73 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
74                         uint16_t flags, void *private_data)
75 {
76         int *fd = (int *)private_data;
77         char c = 0;
78
79         do_write(fd[1], &c, 1);
80 }
81
82
83 /* This will only fire if the fd's returned from pipe() are bi-directional. */
84 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
85                         uint16_t flags, void *private_data)
86 {
87         int *fd = (int *)private_data;
88         char c;
89 #ifdef SA_SIGINFO
90         kill(getpid(), SIGUSR1);
91 #endif
92         kill(getpid(), SIGALRM);
93
94         do_read(fd[1], &c, 1);
95         fde_count++;
96 }
97
98 /* This will only fire if the fd's returned from pipe() are bi-directional. */
99 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
100                         uint16_t flags, void *private_data)
101 {
102         int *fd = (int *)private_data;
103         char c = 0;
104         do_write(fd[0], &c, 1);
105 }
106
107 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
108                              struct timeval tval, void *private_data)
109 {
110         int *finished = (int *)private_data;
111         (*finished) = 1;
112 }
113
114 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
115                           int signum, int count, void *info, void *private_data)
116 {
117         int *countp = (int *)private_data;
118         (*countp) += count;
119 }
120
121 static bool test_event_context(struct torture_context *test,
122                                const void *test_data)
123 {
124         struct tevent_context *ev_ctx;
125         int fd[2] = { -1, -1 };
126         const char *backend = (const char *)test_data;
127         int alarm_count=0, info_count=0;
128         struct tevent_fd *fde_read;
129         struct tevent_fd *fde_read_1;
130         struct tevent_fd *fde_write;
131         struct tevent_fd *fde_write_1;
132 #ifdef SA_RESTART
133         struct tevent_signal *se1 = NULL;
134 #endif
135 #ifdef SA_RESETHAND
136         struct tevent_signal *se2 = NULL;
137 #endif
138 #ifdef SA_SIGINFO
139         struct tevent_signal *se3 = NULL;
140 #endif
141         int finished=0;
142         struct timeval t;
143         int ret;
144
145         ev_ctx = tevent_context_init_byname(test, backend);
146         if (ev_ctx == NULL) {
147                 torture_comment(test, "event backend '%s' not supported\n", backend);
148                 return true;
149         }
150
151         torture_comment(test, "backend '%s' - %s\n",
152                         backend, __FUNCTION__);
153
154         /* reset globals */
155         fde_count = 0;
156
157         /* create a pipe */
158         ret = pipe(fd);
159         torture_assert_int_equal(test, ret, 0, "pipe failed");
160
161         fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
162                             fde_handler_read, fd);
163         fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
164                             fde_handler_write_1, fd);
165
166         fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
167                             fde_handler_write, fd);
168         fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
169                             fde_handler_read_1, fd);
170
171         tevent_fd_set_auto_close(fde_read);
172         tevent_fd_set_auto_close(fde_write);
173
174         tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
175                          finished_handler, &finished);
176
177 #ifdef SA_RESTART
178         se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
179         torture_assert(test, se1 != NULL, "failed to setup se1");
180 #endif
181 #ifdef SA_RESETHAND
182         se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
183         torture_assert(test, se2 != NULL, "failed to setup se2");
184 #endif
185 #ifdef SA_SIGINFO
186         se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
187         torture_assert(test, se3 != NULL, "failed to setup se3");
188 #endif
189
190         t = timeval_current();
191         while (!finished) {
192                 errno = 0;
193                 if (tevent_loop_once(ev_ctx) == -1) {
194                         TALLOC_FREE(ev_ctx);
195                         torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
196                         return false;
197                 }
198         }
199
200         talloc_free(fde_read_1);
201         talloc_free(fde_write_1);
202         talloc_free(fde_read);
203         talloc_free(fde_write);
204
205         while (alarm_count < fde_count+1) {
206                 if (tevent_loop_once(ev_ctx) == -1) {
207                         break;
208                 }
209         }
210
211         torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
212
213 #ifdef SA_RESTART
214         talloc_free(se1);
215 #endif
216
217         torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
218
219 #ifdef SA_RESETHAND
220         /*
221          * we do not call talloc_free(se2)
222          * because it is already gone,
223          * after triggering the event handler.
224          */
225 #endif
226
227 #ifdef SA_SIGINFO
228         talloc_free(se3);
229         torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
230 #endif
231
232         talloc_free(ev_ctx);
233
234         return true;
235 }
236
237 struct test_event_fd1_state {
238         struct torture_context *tctx;
239         const char *backend;
240         struct tevent_context *ev;
241         int sock[2];
242         struct tevent_timer *te;
243         struct tevent_fd *fde0;
244         struct tevent_fd *fde1;
245         bool got_write;
246         bool got_read;
247         bool drain;
248         bool drain_done;
249         unsigned loop_count;
250         bool finished;
251         const char *error;
252 };
253
254 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
255                                        struct tevent_fd *fde,
256                                        uint16_t flags,
257                                        void *private_data)
258 {
259         struct test_event_fd1_state *state =
260                 (struct test_event_fd1_state *)private_data;
261
262         if (state->drain_done) {
263                 state->finished = true;
264                 state->error = __location__;
265                 return;
266         }
267
268         if (state->drain) {
269                 ssize_t ret;
270                 uint8_t c = 0;
271
272                 if (!(flags & TEVENT_FD_READ)) {
273                         state->finished = true;
274                         state->error = __location__;
275                         return;
276                 }
277
278                 ret = read(state->sock[0], &c, 1);
279                 if (ret == 1) {
280                         return;
281                 }
282
283                 /*
284                  * end of test...
285                  */
286                 tevent_fd_set_flags(fde, 0);
287                 state->drain_done = true;
288                 return;
289         }
290
291         if (!state->got_write) {
292                 uint8_t c = 0;
293
294                 if (flags != TEVENT_FD_WRITE) {
295                         state->finished = true;
296                         state->error = __location__;
297                         return;
298                 }
299                 state->got_write = true;
300
301                 /*
302                  * we write to the other socket...
303                  */
304                 do_write(state->sock[1], &c, 1);
305                 TEVENT_FD_NOT_WRITEABLE(fde);
306                 TEVENT_FD_READABLE(fde);
307                 return;
308         }
309
310         if (!state->got_read) {
311                 if (flags != TEVENT_FD_READ) {
312                         state->finished = true;
313                         state->error = __location__;
314                         return;
315                 }
316                 state->got_read = true;
317
318                 TEVENT_FD_NOT_READABLE(fde);
319                 return;
320         }
321
322         state->finished = true;
323         state->error = __location__;
324         return;
325 }
326
327 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
328                                     struct tevent_timer *te,
329                                     struct timeval tval,
330                                     void *private_data)
331 {
332         struct test_event_fd1_state *state =
333                 (struct test_event_fd1_state *)private_data;
334
335         if (state->drain_done) {
336                 state->finished = true;
337                 return;
338         }
339
340         if (!state->got_write) {
341                 state->finished = true;
342                 state->error = __location__;
343                 return;
344         }
345
346         if (!state->got_read) {
347                 state->finished = true;
348                 state->error = __location__;
349                 return;
350         }
351
352         state->loop_count++;
353         if (state->loop_count > 3) {
354                 state->finished = true;
355                 state->error = __location__;
356                 return;
357         }
358
359         state->got_write = false;
360         state->got_read = false;
361
362         tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
363
364         if (state->loop_count > 2) {
365                 state->drain = true;
366                 TALLOC_FREE(state->fde1);
367                 TEVENT_FD_READABLE(state->fde0);
368         }
369
370         state->te = tevent_add_timer(state->ev, state->ev,
371                                     timeval_current_ofs(0,2000),
372                                     test_event_fd1_finished, state);
373 }
374
375 static bool test_event_fd1(struct torture_context *tctx,
376                            const void *test_data)
377 {
378         struct test_event_fd1_state state;
379         int ret;
380
381         ZERO_STRUCT(state);
382         state.tctx = tctx;
383         state.backend = (const char *)test_data;
384
385         state.ev = tevent_context_init_byname(tctx, state.backend);
386         if (state.ev == NULL) {
387                 torture_skip(tctx, talloc_asprintf(tctx,
388                              "event backend '%s' not supported\n",
389                              state.backend));
390                 return true;
391         }
392
393         tevent_set_debug_stderr(state.ev);
394         torture_comment(tctx, "backend '%s' - %s\n",
395                         state.backend, __FUNCTION__);
396
397         /*
398          * This tests the following:
399          *
400          * It monitors the state of state.sock[0]
401          * with tevent_fd, but we never read/write on state.sock[0]
402          * while state.sock[1] * is only used to write a few bytes.
403          *
404          * We have a loop:
405          *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
406          *   - we write 1 byte to state.sock[1]
407          *   - we wait only for TEVENT_FD_READ on state.sock[0]
408          *   - we disable events on state.sock[0]
409          *   - the timer event restarts the loop
410          * Then we close state.sock[1]
411          * We have a loop:
412          *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
413          *   - we try to read 1 byte
414          *   - if the read gets an error of returns 0
415          *     we disable the event handler
416          *   - the timer finishes the test
417          */
418         state.sock[0] = -1;
419         state.sock[1] = -1;
420
421         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
422         torture_assert(tctx, ret == 0, "socketpair() failed");
423
424         state.te = tevent_add_timer(state.ev, state.ev,
425                                     timeval_current_ofs(0,1000),
426                                     test_event_fd1_finished, &state);
427         state.fde0 = tevent_add_fd(state.ev, state.ev,
428                                    state.sock[0], TEVENT_FD_WRITE,
429                                    test_event_fd1_fde_handler, &state);
430         /* state.fde1 is only used to auto close */
431         state.fde1 = tevent_add_fd(state.ev, state.ev,
432                                    state.sock[1], 0,
433                                    test_event_fd1_fde_handler, &state);
434
435         tevent_fd_set_auto_close(state.fde0);
436         tevent_fd_set_auto_close(state.fde1);
437
438         while (!state.finished) {
439                 errno = 0;
440                 if (tevent_loop_once(state.ev) == -1) {
441                         talloc_free(state.ev);
442                         torture_fail(tctx, talloc_asprintf(tctx,
443                                      "Failed event loop %s\n",
444                                      strerror(errno)));
445                 }
446         }
447
448         talloc_free(state.ev);
449
450         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
451                        "%s", state.error));
452
453         return true;
454 }
455
456 struct test_event_fd2_state {
457         struct torture_context *tctx;
458         const char *backend;
459         struct tevent_context *ev;
460         struct tevent_timer *te;
461         struct test_event_fd2_sock {
462                 struct test_event_fd2_state *state;
463                 int fd;
464                 struct tevent_fd *fde;
465                 size_t num_written;
466                 size_t num_read;
467                 bool got_full;
468         } sock0, sock1;
469         bool finished;
470         const char *error;
471 };
472
473 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
474                                         struct tevent_fd *fde,
475                                         uint16_t flags,
476                                         void *private_data)
477 {
478         struct test_event_fd2_sock *cur_sock =
479                 (struct test_event_fd2_sock *)private_data;
480         struct test_event_fd2_state *state = cur_sock->state;
481         struct test_event_fd2_sock *oth_sock = NULL;
482         uint8_t v = 0, c;
483         ssize_t ret;
484
485         if (cur_sock == &state->sock0) {
486                 oth_sock = &state->sock1;
487         } else {
488                 oth_sock = &state->sock0;
489         }
490
491         if (oth_sock->num_written == 1) {
492                 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
493                         state->finished = true;
494                         state->error = __location__;
495                         return;
496                 }
497         }
498
499         if (cur_sock->num_read == oth_sock->num_written) {
500                 state->finished = true;
501                 state->error = __location__;
502                 return;
503         }
504
505         if (!(flags & TEVENT_FD_READ)) {
506                 state->finished = true;
507                 state->error = __location__;
508                 return;
509         }
510
511         if (oth_sock->num_read >= PIPE_BUF) {
512                 /*
513                  * On Linux we become writable once we've read
514                  * one byte. On Solaris we only become writable
515                  * again once we've read 4096 bytes. PIPE_BUF
516                  * is probably a safe bet to test against.
517                  *
518                  * There should be room to write a byte again
519                  */
520                 if (!(flags & TEVENT_FD_WRITE)) {
521                         state->finished = true;
522                         state->error = __location__;
523                         return;
524                 }
525         }
526
527         if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
528                 v = (uint8_t)cur_sock->num_written;
529                 ret = write(cur_sock->fd, &v, 1);
530                 if (ret != 1) {
531                         state->finished = true;
532                         state->error = __location__;
533                         return;
534                 }
535                 cur_sock->num_written++;
536                 if (cur_sock->num_written > 0x80000000) {
537                         state->finished = true;
538                         state->error = __location__;
539                         return;
540                 }
541                 return;
542         }
543
544         if (!cur_sock->got_full) {
545                 cur_sock->got_full = true;
546
547                 if (!oth_sock->got_full) {
548                         /*
549                          * cur_sock is full,
550                          * lets wait for oth_sock
551                          * to be filled
552                          */
553                         tevent_fd_set_flags(cur_sock->fde, 0);
554                         return;
555                 }
556
557                 /*
558                  * oth_sock waited for cur_sock,
559                  * lets restart it
560                  */
561                 tevent_fd_set_flags(oth_sock->fde,
562                                     TEVENT_FD_READ|TEVENT_FD_WRITE);
563         }
564
565         ret = read(cur_sock->fd, &v, 1);
566         if (ret != 1) {
567                 state->finished = true;
568                 state->error = __location__;
569                 return;
570         }
571         c = (uint8_t)cur_sock->num_read;
572         if (c != v) {
573                 state->finished = true;
574                 state->error = __location__;
575                 return;
576         }
577         cur_sock->num_read++;
578
579         if (cur_sock->num_read < oth_sock->num_written) {
580                 /* there is more to read */
581                 return;
582         }
583         /*
584          * we read everything, we need to remove TEVENT_FD_WRITE
585          * to avoid spinning
586          */
587         TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
588
589         if (oth_sock->num_read == cur_sock->num_written) {
590                 /*
591                  * both directions are finished
592                  */
593                 state->finished = true;
594         }
595
596         return;
597 }
598
599 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
600                                     struct tevent_timer *te,
601                                     struct timeval tval,
602                                     void *private_data)
603 {
604         struct test_event_fd2_state *state =
605                 (struct test_event_fd2_state *)private_data;
606
607         /*
608          * this should never be triggered
609          */
610         state->finished = true;
611         state->error = __location__;
612 }
613
614 static bool test_event_fd2(struct torture_context *tctx,
615                            const void *test_data)
616 {
617         struct test_event_fd2_state state;
618         int sock[2];
619         uint8_t c = 0;
620
621         ZERO_STRUCT(state);
622         state.tctx = tctx;
623         state.backend = (const char *)test_data;
624
625         state.ev = tevent_context_init_byname(tctx, state.backend);
626         if (state.ev == NULL) {
627                 torture_skip(tctx, talloc_asprintf(tctx,
628                              "event backend '%s' not supported\n",
629                              state.backend));
630                 return true;
631         }
632
633         tevent_set_debug_stderr(state.ev);
634         torture_comment(tctx, "backend '%s' - %s\n",
635                         state.backend, __FUNCTION__);
636
637         /*
638          * This tests the following
639          *
640          * - We write 1 byte to each socket
641          * - We wait for TEVENT_FD_READ/WRITE on both sockets
642          * - When we get TEVENT_FD_WRITE we write 1 byte
643          *   until both socket buffers are full, which
644          *   means both sockets only get TEVENT_FD_READ.
645          * - Then we read 1 byte until we have consumed
646          *   all bytes the other end has written.
647          */
648         sock[0] = -1;
649         sock[1] = -1;
650         socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
651
652         /*
653          * the timer should never expire
654          */
655         state.te = tevent_add_timer(state.ev, state.ev,
656                                     timeval_current_ofs(600, 0),
657                                     test_event_fd2_finished, &state);
658         state.sock0.state = &state;
659         state.sock0.fd = sock[0];
660         state.sock0.fde = tevent_add_fd(state.ev, state.ev,
661                                         state.sock0.fd,
662                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
663                                         test_event_fd2_sock_handler,
664                                         &state.sock0);
665         state.sock1.state = &state;
666         state.sock1.fd = sock[1];
667         state.sock1.fde = tevent_add_fd(state.ev, state.ev,
668                                         state.sock1.fd,
669                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
670                                         test_event_fd2_sock_handler,
671                                         &state.sock1);
672
673         tevent_fd_set_auto_close(state.sock0.fde);
674         tevent_fd_set_auto_close(state.sock1.fde);
675
676         do_write(state.sock0.fd, &c, 1);
677         state.sock0.num_written++;
678         do_write(state.sock1.fd, &c, 1);
679         state.sock1.num_written++;
680
681         while (!state.finished) {
682                 errno = 0;
683                 if (tevent_loop_once(state.ev) == -1) {
684                         talloc_free(state.ev);
685                         torture_fail(tctx, talloc_asprintf(tctx,
686                                      "Failed event loop %s\n",
687                                      strerror(errno)));
688                 }
689         }
690
691         talloc_free(state.ev);
692
693         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
694                        "%s", state.error));
695
696         return true;
697 }
698
699 struct test_wrapper_state {
700         struct torture_context *tctx;
701         int num_events;
702         int num_wrap_handlers;
703 };
704
705 static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
706                                     void *private_data,
707                                     struct tevent_context *main_ev,
708                                     const char *location)
709 {
710         struct test_wrapper_state *state =
711                 talloc_get_type_abort(private_data,
712                 struct test_wrapper_state);
713
714         torture_comment(state->tctx, "%s\n", __func__);
715         state->num_wrap_handlers++;
716         return true;
717 }
718
719 static void test_wrapper_after_use(struct tevent_context *wrap_ev,
720                                    void *private_data,
721                                    struct tevent_context *main_ev,
722                                    const char *location)
723 {
724         struct test_wrapper_state *state =
725                 talloc_get_type_abort(private_data,
726                 struct test_wrapper_state);
727
728         torture_comment(state->tctx, "%s\n", __func__);
729         state->num_wrap_handlers++;
730 }
731
732 static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
733                                            void *private_data,
734                                            struct tevent_context *main_ev,
735                                            struct tevent_fd *fde,
736                                            uint16_t flags,
737                                            const char *handler_name,
738                                            const char *location)
739 {
740         struct test_wrapper_state *state =
741                 talloc_get_type_abort(private_data,
742                 struct test_wrapper_state);
743
744         torture_comment(state->tctx, "%s\n", __func__);
745         state->num_wrap_handlers++;
746 }
747
748 static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
749                                           void *private_data,
750                                           struct tevent_context *main_ev,
751                                           struct tevent_fd *fde,
752                                           uint16_t flags,
753                                           const char *handler_name,
754                                           const char *location)
755 {
756         struct test_wrapper_state *state =
757                 talloc_get_type_abort(private_data,
758                 struct test_wrapper_state);
759
760         torture_comment(state->tctx, "%s\n", __func__);
761         state->num_wrap_handlers++;
762 }
763
764 static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
765                                               void *private_data,
766                                               struct tevent_context *main_ev,
767                                               struct tevent_timer *te,
768                                               struct timeval requested_time,
769                                               struct timeval trigger_time,
770                                               const char *handler_name,
771                                               const char *location)
772 {
773         struct test_wrapper_state *state =
774                 talloc_get_type_abort(private_data,
775                 struct test_wrapper_state);
776
777         torture_comment(state->tctx, "%s\n", __func__);
778         state->num_wrap_handlers++;
779 }
780
781 static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
782                                              void *private_data,
783                                              struct tevent_context *main_ev,
784                                              struct tevent_timer *te,
785                                              struct timeval requested_time,
786                                              struct timeval trigger_time,
787                                              const char *handler_name,
788                                              const char *location)
789 {
790         struct test_wrapper_state *state =
791                 talloc_get_type_abort(private_data,
792                 struct test_wrapper_state);
793
794         torture_comment(state->tctx, "%s\n", __func__);
795         state->num_wrap_handlers++;
796 }
797
798 static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
799                                                   void *private_data,
800                                                   struct tevent_context *main_ev,
801                                                   struct tevent_immediate *im,
802                                                   const char *handler_name,
803                                                   const char *location)
804 {
805         struct test_wrapper_state *state =
806                 talloc_get_type_abort(private_data,
807                 struct test_wrapper_state);
808
809         torture_comment(state->tctx, "%s\n", __func__);
810         state->num_wrap_handlers++;
811 }
812
813 static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
814                                                  void *private_data,
815                                                  struct tevent_context *main_ev,
816                                                  struct tevent_immediate *im,
817                                                  const char *handler_name,
818                                                  const char *location)
819 {
820         struct test_wrapper_state *state =
821                 talloc_get_type_abort(private_data,
822                 struct test_wrapper_state);
823
824         torture_comment(state->tctx, "%s\n", __func__);
825         state->num_wrap_handlers++;
826 }
827
828 static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
829                                                void *private_data,
830                                                struct tevent_context *main_ev,
831                                                struct tevent_signal *se,
832                                                int signum,
833                                                int count,
834                                                void *siginfo,
835                                                const char *handler_name,
836                                                const char *location)
837 {
838         struct test_wrapper_state *state =
839                 talloc_get_type_abort(private_data,
840                 struct test_wrapper_state);
841
842         torture_comment(state->tctx, "%s\n", __func__);
843         state->num_wrap_handlers++;
844 }
845
846 static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
847                                               void *private_data,
848                                               struct tevent_context *main_ev,
849                                               struct tevent_signal *se,
850                                               int signum,
851                                               int count,
852                                               void *siginfo,
853                                               const char *handler_name,
854                                               const char *location)
855 {
856         struct test_wrapper_state *state =
857                 talloc_get_type_abort(private_data,
858                 struct test_wrapper_state);
859
860         torture_comment(state->tctx, "%s\n", __func__);
861         state->num_wrap_handlers++;
862 }
863
864 static const struct tevent_wrapper_ops test_wrapper_ops = {
865         .name                           = "test_wrapper",
866         .before_use                     = test_wrapper_before_use,
867         .after_use                      = test_wrapper_after_use,
868         .before_fd_handler              = test_wrapper_before_fd_handler,
869         .after_fd_handler               = test_wrapper_after_fd_handler,
870         .before_timer_handler           = test_wrapper_before_timer_handler,
871         .after_timer_handler            = test_wrapper_after_timer_handler,
872         .before_immediate_handler       = test_wrapper_before_immediate_handler,
873         .after_immediate_handler        = test_wrapper_after_immediate_handler,
874         .before_signal_handler          = test_wrapper_before_signal_handler,
875         .after_signal_handler           = test_wrapper_after_signal_handler,
876 };
877
878 static void test_wrapper_timer_handler(struct tevent_context *ev,
879                                        struct tevent_timer *te,
880                                        struct timeval tv,
881                                        void *private_data)
882 {
883         struct test_wrapper_state *state =
884                 (struct test_wrapper_state *)private_data;
885
886
887         torture_comment(state->tctx, "timer handler\n");
888
889         state->num_events++;
890         talloc_free(te);
891         return;
892 }
893
894 static void test_wrapper_fd_handler(struct tevent_context *ev,
895                                     struct tevent_fd *fde,
896                                     unsigned short fd_flags,
897                                     void *private_data)
898 {
899         struct test_wrapper_state *state =
900                 (struct test_wrapper_state *)private_data;
901
902         torture_comment(state->tctx, "fd handler\n");
903
904         state->num_events++;
905         talloc_free(fde);
906         return;
907 }
908
909 static void test_wrapper_immediate_handler(struct tevent_context *ev,
910                                            struct tevent_immediate *im,
911                                            void *private_data)
912 {
913         struct test_wrapper_state *state =
914                 (struct test_wrapper_state *)private_data;
915
916         state->num_events++;
917         talloc_free(im);
918
919         torture_comment(state->tctx, "immediate handler\n");
920         return;
921 }
922
923 static void test_wrapper_signal_handler(struct tevent_context *ev,
924                                         struct tevent_signal *se,
925                                         int signum,
926                                         int count,
927                                         void *siginfo,
928                                         void *private_data)
929 {
930         struct test_wrapper_state *state =
931                 (struct test_wrapper_state *)private_data;
932
933         torture_comment(state->tctx, "signal handler\n");
934
935         state->num_events++;
936         talloc_free(se);
937         return;
938 }
939
940 static bool test_wrapper(struct torture_context *tctx,
941                          const void *test_data)
942 {
943         struct test_wrapper_state *state = NULL;
944         int sock[2] = { -1, -1};
945         uint8_t c = 0;
946         const int num_events = 4;
947         const char *backend = (const char *)test_data;
948         struct tevent_context *ev = NULL;
949         struct tevent_context *wrap_ev = NULL;
950         struct tevent_fd *fde = NULL;
951         struct tevent_timer *te = NULL;
952         struct tevent_signal *se = NULL;
953         struct tevent_immediate *im = NULL;
954         int ret;
955         bool ok = false;
956         bool ret2;
957
958         ev = tevent_context_init_byname(tctx, backend);
959         if (ev == NULL) {
960                 torture_skip(tctx, talloc_asprintf(tctx,
961                              "event backend '%s' not supported\n",
962                              backend));
963                 return true;
964         }
965
966         tevent_set_debug_stderr(ev);
967         torture_comment(tctx, "tevent backend '%s'\n", backend);
968
969         wrap_ev = tevent_context_wrapper_create(
970                 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
971         torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
972                                      "tevent_context_wrapper_create failed\n");
973         *state = (struct test_wrapper_state) {
974                 .tctx = tctx,
975         };
976
977         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
978         torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
979
980         te = tevent_add_timer(wrap_ev, wrap_ev,
981                               timeval_current_ofs(0, 0),
982                               test_wrapper_timer_handler, state);
983         torture_assert_not_null_goto(tctx, te, ok, done,
984                                      "tevent_add_timer failed\n");
985
986         fde = tevent_add_fd(wrap_ev, wrap_ev,
987                             sock[1],
988                             TEVENT_FD_READ,
989                             test_wrapper_fd_handler,
990                             state);
991         torture_assert_not_null_goto(tctx, fde, ok, done,
992                                      "tevent_add_fd failed\n");
993
994         im = tevent_create_immediate(wrap_ev);
995         torture_assert_not_null_goto(tctx, im, ok, done,
996                                      "tevent_create_immediate failed\n");
997
998         se = tevent_add_signal(wrap_ev, wrap_ev,
999                                SIGUSR1,
1000                                0,
1001                                test_wrapper_signal_handler,
1002                                state);
1003         torture_assert_not_null_goto(tctx, se, ok, done,
1004                                      "tevent_add_signal failed\n");
1005
1006         do_write(sock[0], &c, 1);
1007         kill(getpid(), SIGUSR1);
1008         tevent_schedule_immediate(im,
1009                                   wrap_ev,
1010                                   test_wrapper_immediate_handler,
1011                                   state);
1012
1013         ret2 = tevent_context_push_use(wrap_ev);
1014         torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1015         ret2 = tevent_context_push_use(ev);
1016         torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1017         tevent_context_pop_use(ev);
1018         tevent_context_pop_use(wrap_ev);
1019
1020         ret = tevent_loop_wait(ev);
1021         torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1022
1023         torture_comment(tctx, "Num events: %d\n", state->num_events);
1024         torture_comment(tctx, "Num wrap handlers: %d\n",
1025                         state->num_wrap_handlers);
1026
1027         torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1028                                       "Wrong event count\n");
1029         torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1030                                       num_events*2+2,
1031                                       ok, done, "Wrong wrapper count\n");
1032
1033         ok = true;
1034
1035 done:
1036         TALLOC_FREE(wrap_ev);
1037         TALLOC_FREE(ev);
1038
1039         if (sock[0] != -1) {
1040                 close(sock[0]);
1041         }
1042         if (sock[1] != -1) {
1043                 close(sock[1]);
1044         }
1045         return ok;
1046 pop_use:
1047         tevent_context_pop_use(wrap_ev);
1048         goto done;
1049 }
1050
1051 static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1052                                         struct tevent_signal *se,
1053                                         int signum,
1054                                         int count,
1055                                         void *siginfo,
1056                                         void *private_data)
1057 {
1058         struct torture_context *tctx =
1059                 talloc_get_type_abort(private_data,
1060                 struct torture_context);
1061
1062         torture_comment(tctx, "signal handler\n");
1063
1064         talloc_free(se);
1065
1066         /*
1067          * signal handlers have highest priority in tevent, so this signal
1068          * handler will always be started before the other handlers
1069          * below. Freeing the (wrapper) event context here tests that the
1070          * wrapper implementation correclty handles the wrapper ev going away
1071          * with pending events.
1072          */
1073         talloc_free(ev);
1074         return;
1075 }
1076
1077 static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1078                                          struct tevent_fd *fde,
1079                                          unsigned short fd_flags,
1080                                          void *private_data)
1081 {
1082         /*
1083          * This should never be called as
1084          * test_free_wrapper_signal_handler()
1085          * already destroyed the wrapper tevent_context.
1086          */
1087         abort();
1088 }
1089
1090 static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1091                                            struct tevent_immediate *im,
1092                                            void *private_data)
1093 {
1094         /*
1095          * This should never be called as
1096          * test_free_wrapper_signal_handler()
1097          * already destroyed the wrapper tevent_context.
1098          */
1099         abort();
1100 }
1101
1102 static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1103                                        struct tevent_timer *te,
1104                                        struct timeval tv,
1105                                        void *private_data)
1106 {
1107         /*
1108          * This should never be called as
1109          * test_free_wrapper_signal_handler()
1110          * already destroyed the wrapper tevent_context.
1111          */
1112         abort();
1113 }
1114
1115 static bool test_free_wrapper(struct torture_context *tctx,
1116                               const void *test_data)
1117 {
1118         struct test_wrapper_state *state = NULL;
1119         int sock[2] = { -1, -1};
1120         uint8_t c = 0;
1121         const char *backend = (const char *)test_data;
1122         TALLOC_CTX *frame = talloc_stackframe();
1123         struct tevent_context *ev = NULL;
1124         struct tevent_context *wrap_ev = NULL;
1125         struct tevent_fd *fde = NULL;
1126         struct tevent_timer *te = NULL;
1127         struct tevent_signal *se = NULL;
1128         struct tevent_immediate *im = NULL;
1129         int ret;
1130         bool ok = false;
1131
1132         ev = tevent_context_init_byname(frame, backend);
1133         if (ev == NULL) {
1134                 torture_skip(tctx, talloc_asprintf(tctx,
1135                              "event backend '%s' not supported\n",
1136                              backend));
1137                 return true;
1138         }
1139
1140         tevent_set_debug_stderr(ev);
1141         torture_comment(tctx, "tevent backend '%s'\n", backend);
1142
1143         wrap_ev = tevent_context_wrapper_create(
1144                 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
1145         torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1146                                      "tevent_context_wrapper_create failed\n");
1147         *state = (struct test_wrapper_state) {
1148                 .tctx = tctx,
1149         };
1150
1151         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1152         torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1153
1154         fde = tevent_add_fd(wrap_ev, frame,
1155                             sock[1],
1156                             TEVENT_FD_READ,
1157                             test_free_wrapper_fd_handler,
1158                             NULL);
1159         torture_assert_not_null_goto(tctx, fde, ok, done,
1160                                      "tevent_add_fd failed\n");
1161
1162         te = tevent_add_timer(wrap_ev, frame,
1163                               timeval_current_ofs(0, 0),
1164                               test_free_wrapper_timer_handler, NULL);
1165         torture_assert_not_null_goto(tctx, te, ok, done,
1166                                      "tevent_add_timer failed\n");
1167
1168         im = tevent_create_immediate(frame);
1169         torture_assert_not_null_goto(tctx, im, ok, done,
1170                                      "tevent_create_immediate failed\n");
1171
1172         se = tevent_add_signal(wrap_ev, frame,
1173                                SIGUSR1,
1174                                0,
1175                                test_free_wrapper_signal_handler,
1176                                tctx);
1177         torture_assert_not_null_goto(tctx, se, ok, done,
1178                                      "tevent_add_signal failed\n");
1179
1180         do_write(sock[0], &c, 1);
1181         kill(getpid(), SIGUSR1);
1182         tevent_schedule_immediate(im,
1183                                   wrap_ev,
1184                                   test_free_wrapper_immediate_handler,
1185                                   NULL);
1186
1187         ret = tevent_loop_wait(ev);
1188         torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1189
1190         ok = true;
1191
1192 done:
1193         TALLOC_FREE(frame);
1194
1195         if (sock[0] != -1) {
1196                 close(sock[0]);
1197         }
1198         if (sock[1] != -1) {
1199                 close(sock[1]);
1200         }
1201         return ok;
1202 }
1203
1204 #ifdef HAVE_PTHREAD
1205
1206 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1207 static bool do_shutdown = false;
1208
1209 static void test_event_threaded_lock(void)
1210 {
1211         int ret;
1212         ret = pthread_mutex_lock(&threaded_mutex);
1213         assert(ret == 0);
1214 }
1215
1216 static void test_event_threaded_unlock(void)
1217 {
1218         int ret;
1219         ret = pthread_mutex_unlock(&threaded_mutex);
1220         assert(ret == 0);
1221 }
1222
1223 static void test_event_threaded_trace(enum tevent_trace_point point,
1224                                       void *private_data)
1225 {
1226         switch (point) {
1227         case TEVENT_TRACE_BEFORE_WAIT:
1228                 test_event_threaded_unlock();
1229                 break;
1230         case TEVENT_TRACE_AFTER_WAIT:
1231                 test_event_threaded_lock();
1232                 break;
1233         case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1234         case TEVENT_TRACE_AFTER_LOOP_ONCE:
1235                 break;
1236         }
1237 }
1238
1239 static void test_event_threaded_timer(struct tevent_context *ev,
1240                                       struct tevent_timer *te,
1241                                       struct timeval current_time,
1242                                       void *private_data)
1243 {
1244         return;
1245 }
1246
1247 static void *test_event_poll_thread(void *private_data)
1248 {
1249         struct tevent_context *ev = (struct tevent_context *)private_data;
1250
1251         test_event_threaded_lock();
1252
1253         while (true) {
1254                 int ret;
1255                 ret = tevent_loop_once(ev);
1256                 assert(ret == 0);
1257                 if (do_shutdown) {
1258                         test_event_threaded_unlock();
1259                         return NULL;
1260                 }
1261         }
1262
1263 }
1264
1265 static void test_event_threaded_read_handler(struct tevent_context *ev,
1266                                              struct tevent_fd *fde,
1267                                              uint16_t flags,
1268                                              void *private_data)
1269 {
1270         int *pfd = (int *)private_data;
1271         char c;
1272         ssize_t nread;
1273
1274         if ((flags & TEVENT_FD_READ) == 0) {
1275                 return;
1276         }
1277
1278         do {
1279                 nread = read(*pfd, &c, 1);
1280         } while ((nread == -1) && (errno == EINTR));
1281
1282         assert(nread == 1);
1283 }
1284
1285 static bool test_event_context_threaded(struct torture_context *test,
1286                                         const void *test_data)
1287 {
1288         struct tevent_context *ev;
1289         struct tevent_timer *te;
1290         struct tevent_fd *fde;
1291         pthread_t poll_thread;
1292         int fds[2];
1293         int ret;
1294         char c = 0;
1295
1296         ev = tevent_context_init_byname(test, "poll_mt");
1297         torture_assert(test, ev != NULL, "poll_mt not supported");
1298
1299         tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1300
1301         te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1302                               test_event_threaded_timer, NULL);
1303         torture_assert(test, te != NULL, "Could not add timer");
1304
1305         ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1306         torture_assert(test, ret == 0, "Could not create poll thread");
1307
1308         ret = pipe(fds);
1309         torture_assert(test, ret == 0, "Could not create pipe");
1310
1311         poll(NULL, 0, 100);
1312
1313         test_event_threaded_lock();
1314
1315         fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1316                             test_event_threaded_read_handler, &fds[0]);
1317         torture_assert(test, fde != NULL, "Could not add fd event");
1318
1319         test_event_threaded_unlock();
1320
1321         poll(NULL, 0, 100);
1322
1323         do_write(fds[1], &c, 1);
1324
1325         poll(NULL, 0, 100);
1326
1327         test_event_threaded_lock();
1328         do_shutdown = true;
1329         test_event_threaded_unlock();
1330
1331         do_write(fds[1], &c, 1);
1332
1333         ret = pthread_join(poll_thread, NULL);
1334         torture_assert(test, ret == 0, "pthread_join failed");
1335
1336         return true;
1337 }
1338
1339 #define NUM_TEVENT_THREADS 100
1340
1341 /* Ugly, but needed for torture_comment... */
1342 static struct torture_context *thread_test_ctx;
1343 static pthread_t thread_map[NUM_TEVENT_THREADS];
1344 static unsigned thread_counter;
1345
1346 /* Called in master thread context */
1347 static void callback_nowait(struct tevent_context *ev,
1348                                 struct tevent_immediate *im,
1349                                 void *private_ptr)
1350 {
1351         pthread_t *thread_id_ptr =
1352                 talloc_get_type_abort(private_ptr, pthread_t);
1353         unsigned i;
1354
1355         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1356                 if (pthread_equal(*thread_id_ptr,
1357                                 thread_map[i])) {
1358                         break;
1359                 }
1360         }
1361         torture_comment(thread_test_ctx,
1362                         "Callback %u from thread %u\n",
1363                         thread_counter,
1364                         i);
1365         thread_counter++;
1366 }
1367
1368 /* Blast the master tevent_context with a callback, no waiting. */
1369 static void *thread_fn_nowait(void *private_ptr)
1370 {
1371         struct tevent_thread_proxy *master_tp =
1372                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1373         struct tevent_immediate *im;
1374         pthread_t *thread_id_ptr;
1375
1376         im = tevent_create_immediate(NULL);
1377         if (im == NULL) {
1378                 return NULL;
1379         }
1380         thread_id_ptr = talloc(NULL, pthread_t);
1381         if (thread_id_ptr == NULL) {
1382                 return NULL;
1383         }
1384         *thread_id_ptr = pthread_self();
1385
1386         tevent_thread_proxy_schedule(master_tp,
1387                                 &im,
1388                                 callback_nowait,
1389                                 &thread_id_ptr);
1390         return NULL;
1391 }
1392
1393 static void timeout_fn(struct tevent_context *ev,
1394                         struct tevent_timer *te,
1395                         struct timeval tv, void *p)
1396 {
1397         thread_counter = NUM_TEVENT_THREADS * 10;
1398 }
1399
1400 static bool test_multi_tevent_threaded(struct torture_context *test,
1401                                         const void *test_data)
1402 {
1403         unsigned i;
1404         struct tevent_context *master_ev;
1405         struct tevent_thread_proxy *tp;
1406
1407         talloc_disable_null_tracking();
1408
1409         /* Ugly global stuff. */
1410         thread_test_ctx = test;
1411         thread_counter = 0;
1412
1413         master_ev = tevent_context_init(NULL);
1414         if (master_ev == NULL) {
1415                 return false;
1416         }
1417         tevent_set_debug_stderr(master_ev);
1418
1419         tp = tevent_thread_proxy_create(master_ev);
1420         if (tp == NULL) {
1421                 torture_fail(test,
1422                         talloc_asprintf(test,
1423                                 "tevent_thread_proxy_create failed\n"));
1424                 talloc_free(master_ev);
1425                 return false;
1426         }
1427
1428         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1429                 int ret = pthread_create(&thread_map[i],
1430                                 NULL,
1431                                 thread_fn_nowait,
1432                                 tp);
1433                 if (ret != 0) {
1434                         torture_fail(test,
1435                                 talloc_asprintf(test,
1436                                         "Failed to create thread %i, %d\n",
1437                                         i, ret));
1438                         return false;
1439                 }
1440         }
1441
1442         /* Ensure we don't wait more than 10 seconds. */
1443         tevent_add_timer(master_ev,
1444                         master_ev,
1445                         timeval_current_ofs(10,0),
1446                         timeout_fn,
1447                         NULL);
1448
1449         while (thread_counter < NUM_TEVENT_THREADS) {
1450                 int ret = tevent_loop_once(master_ev);
1451                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1452         }
1453
1454         torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1455                 "thread_counter fail\n");
1456
1457         talloc_free(master_ev);
1458         return true;
1459 }
1460
1461 struct reply_state {
1462         struct tevent_thread_proxy *reply_tp;
1463         pthread_t thread_id;
1464         int *p_finished;
1465 };
1466
1467 static void thread_timeout_fn(struct tevent_context *ev,
1468                         struct tevent_timer *te,
1469                         struct timeval tv, void *p)
1470 {
1471         int *p_finished = (int *)p;
1472
1473         *p_finished = 2;
1474 }
1475
1476 /* Called in child-thread context */
1477 static void thread_callback(struct tevent_context *ev,
1478                                 struct tevent_immediate *im,
1479                                 void *private_ptr)
1480 {
1481         struct reply_state *rsp =
1482                 talloc_get_type_abort(private_ptr, struct reply_state);
1483
1484         talloc_steal(ev, rsp);
1485         *rsp->p_finished = 1;
1486 }
1487
1488 /* Called in master thread context */
1489 static void master_callback(struct tevent_context *ev,
1490                                 struct tevent_immediate *im,
1491                                 void *private_ptr)
1492 {
1493         struct reply_state *rsp =
1494                 talloc_get_type_abort(private_ptr, struct reply_state);
1495         unsigned i;
1496
1497         talloc_steal(ev, rsp);
1498
1499         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1500                 if (pthread_equal(rsp->thread_id,
1501                                 thread_map[i])) {
1502                         break;
1503                 }
1504         }
1505         torture_comment(thread_test_ctx,
1506                         "Callback %u from thread %u\n",
1507                         thread_counter,
1508                         i);
1509         /* Now reply to the thread ! */
1510         tevent_thread_proxy_schedule(rsp->reply_tp,
1511                                 &im,
1512                                 thread_callback,
1513                                 &rsp);
1514
1515         thread_counter++;
1516 }
1517
1518 static void *thread_fn_1(void *private_ptr)
1519 {
1520         struct tevent_thread_proxy *master_tp =
1521                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1522         struct tevent_thread_proxy *tp;
1523         struct tevent_immediate *im;
1524         struct tevent_context *ev;
1525         struct reply_state *rsp;
1526         int finished = 0;
1527         int ret;
1528
1529         ev = tevent_context_init(NULL);
1530         if (ev == NULL) {
1531                 return NULL;
1532         }
1533
1534         tp = tevent_thread_proxy_create(ev);
1535         if (tp == NULL) {
1536                 talloc_free(ev);
1537                 return NULL;
1538         }
1539
1540         im = tevent_create_immediate(ev);
1541         if (im == NULL) {
1542                 talloc_free(ev);
1543                 return NULL;
1544         }
1545
1546         rsp = talloc(ev, struct reply_state);
1547         if (rsp == NULL) {
1548                 talloc_free(ev);
1549                 return NULL;
1550         }
1551
1552         rsp->thread_id = pthread_self();
1553         rsp->reply_tp = tp;
1554         rsp->p_finished = &finished;
1555
1556         /* Introduce a little randomness into the mix.. */
1557         usleep(random() % 7000);
1558
1559         tevent_thread_proxy_schedule(master_tp,
1560                                 &im,
1561                                 master_callback,
1562                                 &rsp);
1563
1564         /* Ensure we don't wait more than 10 seconds. */
1565         tevent_add_timer(ev,
1566                         ev,
1567                         timeval_current_ofs(10,0),
1568                         thread_timeout_fn,
1569                         &finished);
1570
1571         while (finished == 0) {
1572                 ret = tevent_loop_once(ev);
1573                 assert(ret == 0);
1574         }
1575
1576         if (finished > 1) {
1577                 /* Timeout ! */
1578                 abort();
1579         }
1580
1581         /*
1582          * NB. We should talloc_free(ev) here, but if we do
1583          * we currently get hit by helgrind Fix #323432
1584          * "When calling pthread_cond_destroy or pthread_mutex_destroy
1585          * with initializers as argument Helgrind (incorrectly) reports errors."
1586          *
1587          * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1588          * with-pthread-mutex-destroy-td47757.html
1589          *
1590          * Helgrind doesn't understand that the request/reply
1591          * messages provide synchronization between the lock/unlock
1592          * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1593          * when the struct tevent_thread_proxy object is talloc_free'd.
1594          *
1595          * As a work-around for now return ev for the parent thread to free.
1596          */
1597         return ev;
1598 }
1599
1600 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1601                                         const void *test_data)
1602 {
1603         unsigned i;
1604         struct tevent_context *master_ev;
1605         struct tevent_thread_proxy *master_tp;
1606         int ret;
1607
1608         talloc_disable_null_tracking();
1609
1610         /* Ugly global stuff. */
1611         thread_test_ctx = test;
1612         thread_counter = 0;
1613
1614         master_ev = tevent_context_init(NULL);
1615         if (master_ev == NULL) {
1616                 return false;
1617         }
1618         tevent_set_debug_stderr(master_ev);
1619
1620         master_tp = tevent_thread_proxy_create(master_ev);
1621         if (master_tp == NULL) {
1622                 torture_fail(test,
1623                         talloc_asprintf(test,
1624                                 "tevent_thread_proxy_create failed\n"));
1625                 talloc_free(master_ev);
1626                 return false;
1627         }
1628
1629         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1630                 ret = pthread_create(&thread_map[i],
1631                                 NULL,
1632                                 thread_fn_1,
1633                                 master_tp);
1634                 if (ret != 0) {
1635                         torture_fail(test,
1636                                 talloc_asprintf(test,
1637                                         "Failed to create thread %i, %d\n",
1638                                         i, ret));
1639                                 return false;
1640                 }
1641         }
1642
1643         while (thread_counter < NUM_TEVENT_THREADS) {
1644                 ret = tevent_loop_once(master_ev);
1645                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1646         }
1647
1648         /* Wait for all the threads to finish - join 'em. */
1649         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1650                 void *retval;
1651                 ret = pthread_join(thread_map[i], &retval);
1652                 torture_assert(test, ret == 0, "pthread_join failed");
1653                 /* Free the child thread event context. */
1654                 talloc_free(retval);
1655         }
1656
1657         talloc_free(master_ev);
1658         return true;
1659 }
1660
1661 struct threaded_test_2 {
1662         struct tevent_threaded_context *tctx;
1663         struct tevent_immediate *im;
1664         pthread_t thread_id;
1665 };
1666
1667 static void master_callback_2(struct tevent_context *ev,
1668                               struct tevent_immediate *im,
1669                               void *private_data);
1670
1671 static void *thread_fn_2(void *private_data)
1672 {
1673         struct threaded_test_2 *state = private_data;
1674
1675         state->thread_id = pthread_self();
1676
1677         usleep(random() % 7000);
1678
1679         tevent_threaded_schedule_immediate(
1680                 state->tctx, state->im, master_callback_2, state);
1681
1682         return NULL;
1683 }
1684
1685 static void master_callback_2(struct tevent_context *ev,
1686                               struct tevent_immediate *im,
1687                               void *private_data)
1688 {
1689         struct threaded_test_2 *state = private_data;
1690         int i;
1691
1692         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1693                 if (pthread_equal(state->thread_id, thread_map[i])) {
1694                         break;
1695                 }
1696         }
1697         torture_comment(thread_test_ctx,
1698                         "Callback_2 %u from thread %u\n",
1699                         thread_counter,
1700                         i);
1701         thread_counter++;
1702 }
1703
1704 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1705                                          const void *test_data)
1706 {
1707         unsigned i;
1708
1709         struct tevent_context *ev;
1710         struct tevent_threaded_context *tctx;
1711         int ret;
1712
1713         thread_test_ctx = test;
1714         thread_counter = 0;
1715
1716         ev = tevent_context_init(test);
1717         torture_assert(test, ev != NULL, "tevent_context_init failed");
1718
1719         /*
1720          * tevent_re_initialise used to have a bug where it did not
1721          * re-initialise the thread support after taking it
1722          * down. Excercise that code path.
1723          */
1724         ret = tevent_re_initialise(ev);
1725         torture_assert(test, ret == 0, "tevent_re_initialise failed");
1726
1727         tctx = tevent_threaded_context_create(ev, ev);
1728         torture_assert(test, tctx != NULL,
1729                        "tevent_threaded_context_create failed");
1730
1731         for (i=0; i<NUM_TEVENT_THREADS; i++) {
1732                 struct threaded_test_2 *state;
1733
1734                 state = talloc(ev, struct threaded_test_2);
1735                 torture_assert(test, state != NULL, "talloc failed");
1736
1737                 state->tctx = tctx;
1738                 state->im = tevent_create_immediate(state);
1739                 torture_assert(test, state->im != NULL,
1740                                "tevent_create_immediate failed");
1741
1742                 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1743                 torture_assert(test, ret == 0, "pthread_create failed");
1744         }
1745
1746         while (thread_counter < NUM_TEVENT_THREADS) {
1747                 ret = tevent_loop_once(ev);
1748                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1749         }
1750
1751         /* Wait for all the threads to finish - join 'em. */
1752         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1753                 void *retval;
1754                 ret = pthread_join(thread_map[i], &retval);
1755                 torture_assert(test, ret == 0, "pthread_join failed");
1756                 /* Free the child thread event context. */
1757         }
1758
1759         talloc_free(tctx);
1760         talloc_free(ev);
1761         return true;
1762 }
1763 #endif
1764
1765 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1766 {
1767         struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1768         const char **list = tevent_backend_list(suite);
1769         int i;
1770
1771         for (i=0;list && list[i];i++) {
1772                 struct torture_suite *backend_suite;
1773
1774                 backend_suite = torture_suite_create(mem_ctx, list[i]);
1775
1776                 torture_suite_add_simple_tcase_const(backend_suite,
1777                                                "context",
1778                                                test_event_context,
1779                                                (const void *)list[i]);
1780                 torture_suite_add_simple_tcase_const(backend_suite,
1781                                                "fd1",
1782                                                test_event_fd1,
1783                                                (const void *)list[i]);
1784                 torture_suite_add_simple_tcase_const(backend_suite,
1785                                                "fd2",
1786                                                test_event_fd2,
1787                                                (const void *)list[i]);
1788                 torture_suite_add_simple_tcase_const(backend_suite,
1789                                                "wrapper",
1790                                                test_wrapper,
1791                                                (const void *)list[i]);
1792                 torture_suite_add_simple_tcase_const(backend_suite,
1793                                                "free_wrapper",
1794                                                test_free_wrapper,
1795                                                (const void *)list[i]);
1796
1797                 torture_suite_add_suite(suite, backend_suite);
1798         }
1799
1800 #ifdef HAVE_PTHREAD
1801         torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1802                                              test_event_context_threaded,
1803                                              NULL);
1804
1805         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1806                                              test_multi_tevent_threaded,
1807                                              NULL);
1808
1809         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1810                                              test_multi_tevent_threaded_1,
1811                                              NULL);
1812
1813         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1814                                              test_multi_tevent_threaded_2,
1815                                              NULL);
1816
1817 #endif
1818
1819         return suite;
1820 }