#include "system/select.h"
#include "system/network.h"
#include "torture/torture.h"
+#include "torture/local/proto.h"
#ifdef HAVE_PTHREAD
#include <pthread.h>
#include <assert.h>
static int fde_count;
+static void do_read(int fd, void *buf, size_t count)
+{
+ ssize_t ret;
+
+ do {
+ ret = read(fd, buf, count);
+ } while (ret == -1 && errno == EINTR);
+}
+
static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
uint16_t flags, void *private_data)
{
#endif
kill(getpid(), SIGALRM);
- read(fd[0], &c, 1);
+ do_read(fd[0], &c, 1);
fde_count++;
}
+static void do_write(int fd, void *buf, size_t count)
+{
+ ssize_t ret;
+
+ do {
+ ret = write(fd, buf, count);
+ } while (ret == -1 && errno == EINTR);
+}
+
static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
uint16_t flags, void *private_data)
{
int *fd = (int *)private_data;
char c = 0;
- write(fd[1], &c, 1);
+
+ do_write(fd[1], &c, 1);
}
#endif
kill(getpid(), SIGALRM);
- read(fd[1], &c, 1);
+ do_read(fd[1], &c, 1);
fde_count++;
}
{
int *fd = (int *)private_data;
char c = 0;
- write(fd[0], &c, 1);
+ do_write(fd[0], &c, 1);
}
static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
#endif
int finished=0;
struct timeval t;
+ int ret;
ev_ctx = tevent_context_init_byname(test, backend);
if (ev_ctx == NULL) {
fde_count = 0;
/* create a pipe */
- pipe(fd);
+ ret = pipe(fd);
+ torture_assert_int_equal(test, ret, 0, "pipe failed");
fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
fde_handler_read, fd);
}
}
- talloc_free(fde_read);
- talloc_free(fde_write);
talloc_free(fde_read_1);
talloc_free(fde_write_1);
+ talloc_free(fde_read);
+ talloc_free(fde_write);
while (alarm_count < fde_count+1) {
if (tevent_loop_once(ev_ctx) == -1) {
/*
* we write to the other socket...
*/
- write(state->sock[1], &c, 1);
+ do_write(state->sock[1], &c, 1);
TEVENT_FD_NOT_WRITEABLE(fde);
TEVENT_FD_READABLE(fde);
return;
tevent_fd_set_auto_close(state.sock0.fde);
tevent_fd_set_auto_close(state.sock1.fde);
- write(state.sock0.fd, &c, 1);
+ do_write(state.sock0.fd, &c, 1);
state.sock0.num_written++;
- write(state.sock1.fd, &c, 1);
+ do_write(state.sock1.fd, &c, 1);
state.sock1.num_written++;
while (!state.finished) {
poll(NULL, 0, 100);
- write(fds[1], &c, 1);
+ do_write(fds[1], &c, 1);
poll(NULL, 0, 100);
do_shutdown = true;
test_event_threaded_unlock();
- write(fds[1], &c, 1);
+ do_write(fds[1], &c, 1);
ret = pthread_join(poll_thread, NULL);
torture_assert(test, ret == 0, "pthread_join failed");
return true;
}
+#define NUM_TEVENT_THREADS 100
+
+/* Ugly, but needed for torture_comment... */
+static struct torture_context *thread_test_ctx;
+static pthread_t thread_map[NUM_TEVENT_THREADS];
+static unsigned thread_counter;
+
+/* Called in master thread context */
+static void callback_nowait(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ pthread_t *thread_id_ptr =
+ talloc_get_type_abort(private_ptr, pthread_t);
+ unsigned i;
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(*thread_id_ptr,
+ thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback %u from thread %u\n",
+ thread_counter,
+ i);
+ thread_counter++;
+}
+
+/* Blast the master tevent_context with a callback, no waiting. */
+static void *thread_fn_nowait(void *private_ptr)
+{
+ struct tevent_thread_proxy *master_tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ struct tevent_immediate *im;
+ pthread_t *thread_id_ptr;
+
+ im = tevent_create_immediate(NULL);
+ if (im == NULL) {
+ return NULL;
+ }
+ thread_id_ptr = talloc(NULL, pthread_t);
+ if (thread_id_ptr == NULL) {
+ return NULL;
+ }
+ *thread_id_ptr = pthread_self();
+
+ tevent_thread_proxy_schedule(master_tp,
+ &im,
+ callback_nowait,
+ &thread_id_ptr);
+ return NULL;
+}
+
+static void timeout_fn(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval tv, void *p)
+{
+ thread_counter = NUM_TEVENT_THREADS * 10;
+}
+
+static bool test_multi_tevent_threaded(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+ struct tevent_context *master_ev;
+ struct tevent_thread_proxy *tp;
+
+ talloc_disable_null_tracking();
+
+ /* Ugly global stuff. */
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ master_ev = tevent_context_init(NULL);
+ if (master_ev == NULL) {
+ return false;
+ }
+ tevent_set_debug_stderr(master_ev);
+
+ tp = tevent_thread_proxy_create(master_ev);
+ if (tp == NULL) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "tevent_thread_proxy_create failed\n"));
+ talloc_free(master_ev);
+ return false;
+ }
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ int ret = pthread_create(&thread_map[i],
+ NULL,
+ thread_fn_nowait,
+ tp);
+ if (ret != 0) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "Failed to create thread %i, %d\n",
+ i, ret));
+ return false;
+ }
+ }
+
+ /* Ensure we don't wait more than 10 seconds. */
+ tevent_add_timer(master_ev,
+ master_ev,
+ timeval_current_ofs(10,0),
+ timeout_fn,
+ NULL);
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ int ret = tevent_loop_once(master_ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
+ "thread_counter fail\n");
+
+ talloc_free(master_ev);
+ return true;
+}
+
+struct reply_state {
+ struct tevent_thread_proxy *reply_tp;
+ pthread_t thread_id;
+ int *p_finished;
+};
+
+static void thread_timeout_fn(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval tv, void *p)
+{
+ int *p_finished = (int *)p;
+
+ *p_finished = 2;
+}
+
+/* Called in child-thread context */
+static void thread_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+
+ talloc_steal(ev, rsp);
+ *rsp->p_finished = 1;
+}
+
+/* Called in master thread context */
+static void master_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+ unsigned i;
+
+ talloc_steal(ev, rsp);
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(rsp->thread_id,
+ thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback %u from thread %u\n",
+ thread_counter,
+ i);
+ /* Now reply to the thread ! */
+ tevent_thread_proxy_schedule(rsp->reply_tp,
+ &im,
+ thread_callback,
+ &rsp);
+
+ thread_counter++;
+}
+
+static void *thread_fn_1(void *private_ptr)
+{
+ struct tevent_thread_proxy *master_tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ struct tevent_thread_proxy *tp;
+ struct tevent_immediate *im;
+ struct tevent_context *ev;
+ struct reply_state *rsp;
+ int finished = 0;
+ int ret;
+
+ ev = tevent_context_init(NULL);
+ if (ev == NULL) {
+ return NULL;
+ }
+
+ tp = tevent_thread_proxy_create(ev);
+ if (tp == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ im = tevent_create_immediate(ev);
+ if (im == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ rsp = talloc(ev, struct reply_state);
+ if (rsp == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ rsp->thread_id = pthread_self();
+ rsp->reply_tp = tp;
+ rsp->p_finished = &finished;
+
+ /* Introduce a little randomness into the mix.. */
+ usleep(random() % 7000);
+
+ tevent_thread_proxy_schedule(master_tp,
+ &im,
+ master_callback,
+ &rsp);
+
+ /* Ensure we don't wait more than 10 seconds. */
+ tevent_add_timer(ev,
+ ev,
+ timeval_current_ofs(10,0),
+ thread_timeout_fn,
+ &finished);
+
+ while (finished == 0) {
+ ret = tevent_loop_once(ev);
+ assert(ret == 0);
+ }
+
+ if (finished > 1) {
+ /* Timeout ! */
+ abort();
+ }
+
+ /*
+ * NB. We should talloc_free(ev) here, but if we do
+ * we currently get hit by helgrind Fix #323432
+ * "When calling pthread_cond_destroy or pthread_mutex_destroy
+ * with initializers as argument Helgrind (incorrectly) reports errors."
+ *
+ * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
+ * with-pthread-mutex-destroy-td47757.html
+ *
+ * Helgrind doesn't understand that the request/reply
+ * messages provide synchronization between the lock/unlock
+ * in tevent_thread_proxy_schedule(), and the pthread_destroy()
+ * when the struct tevent_thread_proxy object is talloc_free'd.
+ *
+ * As a work-around for now return ev for the parent thread to free.
+ */
+ return ev;
+}
+
+static bool test_multi_tevent_threaded_1(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+ struct tevent_context *master_ev;
+ struct tevent_thread_proxy *master_tp;
+ int ret;
+
+ talloc_disable_null_tracking();
+
+ /* Ugly global stuff. */
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ master_ev = tevent_context_init(NULL);
+ if (master_ev == NULL) {
+ return false;
+ }
+ tevent_set_debug_stderr(master_ev);
+
+ master_tp = tevent_thread_proxy_create(master_ev);
+ if (master_tp == NULL) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "tevent_thread_proxy_create failed\n"));
+ talloc_free(master_ev);
+ return false;
+ }
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ ret = pthread_create(&thread_map[i],
+ NULL,
+ thread_fn_1,
+ master_tp);
+ if (ret != 0) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "Failed to create thread %i, %d\n",
+ i, ret));
+ return false;
+ }
+ }
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ ret = tevent_loop_once(master_ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ /* Wait for all the threads to finish - join 'em. */
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ void *retval;
+ ret = pthread_join(thread_map[i], &retval);
+ torture_assert(test, ret == 0, "pthread_join failed");
+ /* Free the child thread event context. */
+ talloc_free(retval);
+ }
+
+ talloc_free(master_ev);
+ return true;
+}
+
+struct threaded_test_2 {
+ struct tevent_threaded_context *tctx;
+ struct tevent_immediate *im;
+ pthread_t thread_id;
+};
+
+static void master_callback_2(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data);
+
+static void *thread_fn_2(void *private_data)
+{
+ struct threaded_test_2 *state = private_data;
+
+ state->thread_id = pthread_self();
+
+ usleep(random() % 7000);
+
+ tevent_threaded_schedule_immediate(
+ state->tctx, state->im, master_callback_2, state);
+
+ return NULL;
+}
+
+static void master_callback_2(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct threaded_test_2 *state = private_data;
+ int i;
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(state->thread_id, thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback_2 %u from thread %u\n",
+ thread_counter,
+ i);
+ thread_counter++;
+}
+
+static bool test_multi_tevent_threaded_2(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+
+ struct tevent_context *ev;
+ struct tevent_threaded_context *tctx;
+ int ret;
+
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ ev = tevent_context_init(test);
+ torture_assert(test, ev != NULL, "tevent_context_init failed");
+
+ tctx = tevent_threaded_context_create(ev, ev);
+ torture_assert(test, tctx != NULL,
+ "tevent_threaded_context_create failed");
+
+ for (i=0; i<NUM_TEVENT_THREADS; i++) {
+ struct threaded_test_2 *state;
+
+ state = talloc(ev, struct threaded_test_2);
+ torture_assert(test, state != NULL, "talloc failed");
+
+ state->tctx = tctx;
+ state->im = tevent_create_immediate(state);
+ torture_assert(test, state->im != NULL,
+ "tevent_create_immediate failed");
+
+ ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
+ torture_assert(test, ret == 0, "pthread_create failed");
+ }
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ ret = tevent_loop_once(ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ /* Wait for all the threads to finish - join 'em. */
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ void *retval;
+ ret = pthread_join(thread_map[i], &retval);
+ torture_assert(test, ret == 0, "pthread_join failed");
+ /* Free the child thread event context. */
+ }
+
+ talloc_free(tctx);
+ talloc_free(ev);
+ return true;
+}
#endif
struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
test_event_context_threaded,
NULL);
+
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
+ test_multi_tevent_threaded,
+ NULL);
+
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
+ test_multi_tevent_threaded_1,
+ NULL);
+
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
+ test_multi_tevent_threaded_2,
+ NULL);
+
#endif
return suite;