talloc: use the system pytalloc-util for python3 as well
[sfrench/samba-autobuild/.git] / lib / tevent / testsuite.c
index 7851c6c92db92e6ebf710b4f5633abf8d9be5b23..4783ab41153687180b5683004dd10c80092c7c63 100644 (file)
@@ -30,6 +30,7 @@
 #include "system/select.h"
 #include "system/network.h"
 #include "torture/torture.h"
+#include "torture/local/proto.h"
 #ifdef HAVE_PTHREAD
 #include <pthread.h>
 #include <assert.h>
 
 static int fde_count;
 
+static void do_read(int fd, void *buf, size_t count)
+{
+       ssize_t ret;
+
+       do {
+               ret = read(fd, buf, count);
+       } while (ret == -1 && errno == EINTR);
+}
+
 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
                        uint16_t flags, void *private_data)
 {
@@ -47,16 +57,26 @@ static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
 #endif
        kill(getpid(), SIGALRM);
 
-       read(fd[0], &c, 1);
+       do_read(fd[0], &c, 1);
        fde_count++;
 }
 
+static void do_write(int fd, void *buf, size_t count)
+{
+       ssize_t ret;
+
+       do {
+               ret = write(fd, buf, count);
+       } while (ret == -1 && errno == EINTR);
+}
+
 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
                        uint16_t flags, void *private_data)
 {
        int *fd = (int *)private_data;
        char c = 0;
-       write(fd[1], &c, 1);
+
+       do_write(fd[1], &c, 1);
 }
 
 
@@ -71,7 +91,7 @@ static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *
 #endif
        kill(getpid(), SIGALRM);
 
-       read(fd[1], &c, 1);
+       do_read(fd[1], &c, 1);
        fde_count++;
 }
 
@@ -81,7 +101,7 @@ static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd
 {
        int *fd = (int *)private_data;
        char c = 0;
-       write(fd[0], &c, 1);
+       do_write(fd[0], &c, 1);
 }
 
 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
@@ -120,6 +140,7 @@ static bool test_event_context(struct torture_context *test,
 #endif
        int finished=0;
        struct timeval t;
+       int ret;
 
        ev_ctx = tevent_context_init_byname(test, backend);
        if (ev_ctx == NULL) {
@@ -134,7 +155,8 @@ static bool test_event_context(struct torture_context *test,
        fde_count = 0;
 
        /* create a pipe */
-       pipe(fd);
+       ret = pipe(fd);
+       torture_assert_int_equal(test, ret, 0, "pipe failed");
 
        fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
                            fde_handler_read, fd);
@@ -174,10 +196,10 @@ static bool test_event_context(struct torture_context *test,
                }
        }
 
-       talloc_free(fde_read);
-       talloc_free(fde_write);
        talloc_free(fde_read_1);
        talloc_free(fde_write_1);
+       talloc_free(fde_read);
+       talloc_free(fde_write);
 
        while (alarm_count < fde_count+1) {
                if (tevent_loop_once(ev_ctx) == -1) {
@@ -278,7 +300,7 @@ static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
                /*
                 * we write to the other socket...
                 */
-               write(state->sock[1], &c, 1);
+               do_write(state->sock[1], &c, 1);
                TEVENT_FD_NOT_WRITEABLE(fde);
                TEVENT_FD_READABLE(fde);
                return;
@@ -647,9 +669,9 @@ static bool test_event_fd2(struct torture_context *tctx,
        tevent_fd_set_auto_close(state.sock0.fde);
        tevent_fd_set_auto_close(state.sock1.fde);
 
-       write(state.sock0.fd, &c, 1);
+       do_write(state.sock0.fd, &c, 1);
        state.sock0.num_written++;
-       write(state.sock1.fd, &c, 1);
+       do_write(state.sock1.fd, &c, 1);
        state.sock1.num_written++;
 
        while (!state.finished) {
@@ -789,7 +811,7 @@ static bool test_event_context_threaded(struct torture_context *test,
 
        poll(NULL, 0, 100);
 
-       write(fds[1], &c, 1);
+       do_write(fds[1], &c, 1);
 
        poll(NULL, 0, 100);
 
@@ -797,7 +819,7 @@ static bool test_event_context_threaded(struct torture_context *test,
        do_shutdown = true;
        test_event_threaded_unlock();
 
-       write(fds[1], &c, 1);
+       do_write(fds[1], &c, 1);
 
        ret = pthread_join(poll_thread, NULL);
        torture_assert(test, ret == 0, "pthread_join failed");
@@ -805,6 +827,422 @@ static bool test_event_context_threaded(struct torture_context *test,
        return true;
 }
 
+#define NUM_TEVENT_THREADS 100
+
+/* Ugly, but needed for torture_comment... */
+static struct torture_context *thread_test_ctx;
+static pthread_t thread_map[NUM_TEVENT_THREADS];
+static unsigned thread_counter;
+
+/* Called in master thread context */
+static void callback_nowait(struct tevent_context *ev,
+                               struct tevent_immediate *im,
+                               void *private_ptr)
+{
+       pthread_t *thread_id_ptr =
+               talloc_get_type_abort(private_ptr, pthread_t);
+       unsigned i;
+
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               if (pthread_equal(*thread_id_ptr,
+                               thread_map[i])) {
+                       break;
+               }
+       }
+       torture_comment(thread_test_ctx,
+                       "Callback %u from thread %u\n",
+                       thread_counter,
+                       i);
+       thread_counter++;
+}
+
+/* Blast the master tevent_context with a callback, no waiting. */
+static void *thread_fn_nowait(void *private_ptr)
+{
+       struct tevent_thread_proxy *master_tp =
+               talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+       struct tevent_immediate *im;
+       pthread_t *thread_id_ptr;
+
+       im = tevent_create_immediate(NULL);
+       if (im == NULL) {
+               return NULL;
+       }
+       thread_id_ptr = talloc(NULL, pthread_t);
+       if (thread_id_ptr == NULL) {
+               return NULL;
+       }
+       *thread_id_ptr = pthread_self();
+
+       tevent_thread_proxy_schedule(master_tp,
+                               &im,
+                               callback_nowait,
+                               &thread_id_ptr);
+       return NULL;
+}
+
+static void timeout_fn(struct tevent_context *ev,
+                       struct tevent_timer *te,
+                       struct timeval tv, void *p)
+{
+       thread_counter = NUM_TEVENT_THREADS * 10;
+}
+
+static bool test_multi_tevent_threaded(struct torture_context *test,
+                                       const void *test_data)
+{
+       unsigned i;
+       struct tevent_context *master_ev;
+       struct tevent_thread_proxy *tp;
+
+       talloc_disable_null_tracking();
+
+       /* Ugly global stuff. */
+       thread_test_ctx = test;
+       thread_counter = 0;
+
+       master_ev = tevent_context_init(NULL);
+       if (master_ev == NULL) {
+               return false;
+       }
+       tevent_set_debug_stderr(master_ev);
+
+       tp = tevent_thread_proxy_create(master_ev);
+       if (tp == NULL) {
+               torture_fail(test,
+                       talloc_asprintf(test,
+                               "tevent_thread_proxy_create failed\n"));
+               talloc_free(master_ev);
+               return false;
+       }
+
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               int ret = pthread_create(&thread_map[i],
+                               NULL,
+                               thread_fn_nowait,
+                               tp);
+               if (ret != 0) {
+                       torture_fail(test,
+                               talloc_asprintf(test,
+                                       "Failed to create thread %i, %d\n",
+                                       i, ret));
+                       return false;
+               }
+       }
+
+       /* Ensure we don't wait more than 10 seconds. */
+       tevent_add_timer(master_ev,
+                       master_ev,
+                       timeval_current_ofs(10,0),
+                       timeout_fn,
+                       NULL);
+
+       while (thread_counter < NUM_TEVENT_THREADS) {
+               int ret = tevent_loop_once(master_ev);
+               torture_assert(test, ret == 0, "tevent_loop_once failed");
+       }
+
+       torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
+               "thread_counter fail\n");
+
+       talloc_free(master_ev);
+       return true;
+}
+
+struct reply_state {
+       struct tevent_thread_proxy *reply_tp;
+       pthread_t thread_id;
+       int *p_finished;
+};
+
+static void thread_timeout_fn(struct tevent_context *ev,
+                       struct tevent_timer *te,
+                       struct timeval tv, void *p)
+{
+       int *p_finished = (int *)p;
+
+       *p_finished = 2;
+}
+
+/* Called in child-thread context */
+static void thread_callback(struct tevent_context *ev,
+                               struct tevent_immediate *im,
+                               void *private_ptr)
+{
+       struct reply_state *rsp =
+               talloc_get_type_abort(private_ptr, struct reply_state);
+
+       talloc_steal(ev, rsp);
+       *rsp->p_finished = 1;
+}
+
+/* Called in master thread context */
+static void master_callback(struct tevent_context *ev,
+                               struct tevent_immediate *im,
+                               void *private_ptr)
+{
+       struct reply_state *rsp =
+               talloc_get_type_abort(private_ptr, struct reply_state);
+       unsigned i;
+
+       talloc_steal(ev, rsp);
+
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               if (pthread_equal(rsp->thread_id,
+                               thread_map[i])) {
+                       break;
+               }
+       }
+       torture_comment(thread_test_ctx,
+                       "Callback %u from thread %u\n",
+                       thread_counter,
+                       i);
+       /* Now reply to the thread ! */
+       tevent_thread_proxy_schedule(rsp->reply_tp,
+                               &im,
+                               thread_callback,
+                               &rsp);
+
+       thread_counter++;
+}
+
+static void *thread_fn_1(void *private_ptr)
+{
+       struct tevent_thread_proxy *master_tp =
+               talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+       struct tevent_thread_proxy *tp;
+       struct tevent_immediate *im;
+       struct tevent_context *ev;
+       struct reply_state *rsp;
+       int finished = 0;
+       int ret;
+
+       ev = tevent_context_init(NULL);
+       if (ev == NULL) {
+               return NULL;
+       }
+
+       tp = tevent_thread_proxy_create(ev);
+       if (tp == NULL) {
+               talloc_free(ev);
+               return NULL;
+       }
+
+       im = tevent_create_immediate(ev);
+       if (im == NULL) {
+               talloc_free(ev);
+               return NULL;
+       }
+
+       rsp = talloc(ev, struct reply_state);
+       if (rsp == NULL) {
+               talloc_free(ev);
+               return NULL;
+       }
+
+       rsp->thread_id = pthread_self();
+       rsp->reply_tp = tp;
+       rsp->p_finished = &finished;
+
+       /* Introduce a little randomness into the mix.. */
+       usleep(random() % 7000);
+
+       tevent_thread_proxy_schedule(master_tp,
+                               &im,
+                               master_callback,
+                               &rsp);
+
+       /* Ensure we don't wait more than 10 seconds. */
+       tevent_add_timer(ev,
+                       ev,
+                       timeval_current_ofs(10,0),
+                       thread_timeout_fn,
+                       &finished);
+
+       while (finished == 0) {
+               ret = tevent_loop_once(ev);
+               assert(ret == 0);
+       }
+
+       if (finished > 1) {
+               /* Timeout ! */
+               abort();
+       }
+
+       /*
+        * NB. We should talloc_free(ev) here, but if we do
+        * we currently get hit by helgrind Fix #323432
+        * "When calling pthread_cond_destroy or pthread_mutex_destroy
+        * with initializers as argument Helgrind (incorrectly) reports errors."
+        *
+        * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
+        * with-pthread-mutex-destroy-td47757.html
+        *
+        * Helgrind doesn't understand that the request/reply
+        * messages provide synchronization between the lock/unlock
+        * in tevent_thread_proxy_schedule(), and the pthread_destroy()
+        * when the struct tevent_thread_proxy object is talloc_free'd.
+        *
+        * As a work-around for now return ev for the parent thread to free.
+        */
+       return ev;
+}
+
+static bool test_multi_tevent_threaded_1(struct torture_context *test,
+                                       const void *test_data)
+{
+       unsigned i;
+       struct tevent_context *master_ev;
+       struct tevent_thread_proxy *master_tp;
+       int ret;
+
+       talloc_disable_null_tracking();
+
+       /* Ugly global stuff. */
+       thread_test_ctx = test;
+       thread_counter = 0;
+
+       master_ev = tevent_context_init(NULL);
+       if (master_ev == NULL) {
+               return false;
+       }
+       tevent_set_debug_stderr(master_ev);
+
+       master_tp = tevent_thread_proxy_create(master_ev);
+       if (master_tp == NULL) {
+               torture_fail(test,
+                       talloc_asprintf(test,
+                               "tevent_thread_proxy_create failed\n"));
+               talloc_free(master_ev);
+               return false;
+       }
+
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               ret = pthread_create(&thread_map[i],
+                               NULL,
+                               thread_fn_1,
+                               master_tp);
+               if (ret != 0) {
+                       torture_fail(test,
+                               talloc_asprintf(test,
+                                       "Failed to create thread %i, %d\n",
+                                       i, ret));
+                               return false;
+               }
+       }
+
+       while (thread_counter < NUM_TEVENT_THREADS) {
+               ret = tevent_loop_once(master_ev);
+               torture_assert(test, ret == 0, "tevent_loop_once failed");
+       }
+
+       /* Wait for all the threads to finish - join 'em. */
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               void *retval;
+               ret = pthread_join(thread_map[i], &retval);
+               torture_assert(test, ret == 0, "pthread_join failed");
+               /* Free the child thread event context. */
+               talloc_free(retval);
+       }
+
+       talloc_free(master_ev);
+       return true;
+}
+
+struct threaded_test_2 {
+       struct tevent_threaded_context *tctx;
+       struct tevent_immediate *im;
+       pthread_t thread_id;
+};
+
+static void master_callback_2(struct tevent_context *ev,
+                             struct tevent_immediate *im,
+                             void *private_data);
+
+static void *thread_fn_2(void *private_data)
+{
+       struct threaded_test_2 *state = private_data;
+
+       state->thread_id = pthread_self();
+
+       usleep(random() % 7000);
+
+       tevent_threaded_schedule_immediate(
+               state->tctx, state->im, master_callback_2, state);
+
+       return NULL;
+}
+
+static void master_callback_2(struct tevent_context *ev,
+                             struct tevent_immediate *im,
+                             void *private_data)
+{
+       struct threaded_test_2 *state = private_data;
+       int i;
+
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               if (pthread_equal(state->thread_id, thread_map[i])) {
+                       break;
+               }
+       }
+       torture_comment(thread_test_ctx,
+                       "Callback_2 %u from thread %u\n",
+                       thread_counter,
+                       i);
+       thread_counter++;
+}
+
+static bool test_multi_tevent_threaded_2(struct torture_context *test,
+                                        const void *test_data)
+{
+       unsigned i;
+
+       struct tevent_context *ev;
+       struct tevent_threaded_context *tctx;
+       int ret;
+
+       thread_test_ctx = test;
+       thread_counter = 0;
+
+       ev = tevent_context_init(test);
+       torture_assert(test, ev != NULL, "tevent_context_init failed");
+
+       tctx = tevent_threaded_context_create(ev, ev);
+       torture_assert(test, tctx != NULL,
+                      "tevent_threaded_context_create failed");
+
+       for (i=0; i<NUM_TEVENT_THREADS; i++) {
+               struct threaded_test_2 *state;
+
+               state = talloc(ev, struct threaded_test_2);
+               torture_assert(test, state != NULL, "talloc failed");
+
+               state->tctx = tctx;
+               state->im = tevent_create_immediate(state);
+               torture_assert(test, state->im != NULL,
+                              "tevent_create_immediate failed");
+
+               ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
+               torture_assert(test, ret == 0, "pthread_create failed");
+       }
+
+       while (thread_counter < NUM_TEVENT_THREADS) {
+               ret = tevent_loop_once(ev);
+               torture_assert(test, ret == 0, "tevent_loop_once failed");
+       }
+
+       /* Wait for all the threads to finish - join 'em. */
+       for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+               void *retval;
+               ret = pthread_join(thread_map[i], &retval);
+               torture_assert(test, ret == 0, "pthread_join failed");
+               /* Free the child thread event context. */
+       }
+
+       talloc_free(tctx);
+       talloc_free(ev);
+       return true;
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -838,6 +1276,19 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
        torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
                                             test_event_context_threaded,
                                             NULL);
+
+       torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
+                                            test_multi_tevent_threaded,
+                                            NULL);
+
+       torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
+                                            test_multi_tevent_threaded_1,
+                                            NULL);
+
+       torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
+                                            test_multi_tevent_threaded_2,
+                                            NULL);
+
 #endif
 
        return suite;