lib:replace: Add getprogname()
[samba.git] / lib / pthreadpool / tests_cmocka.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * cmocka tests for thread pool implementation
4  * Copyright (C) Christof Schmitt 2017
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "config.h"
21 #include <errno.h>
22 #include <pthread.h>
23 #include <setjmp.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30
31 #include <talloc.h>
32 #include <tevent.h>
33 #include <pthreadpool_tevent.h>
34
35 #include <cmocka.h>
36 #include <poll.h>
37
38 #ifdef HAVE_VALGRIND_HELGRIND_H
39 #include <valgrind/helgrind.h>
40 #endif
41 #ifndef ANNOTATE_BENIGN_RACE_SIZED
42 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
43 #endif
44
45 struct pthreadpool_tevent_test {
46         struct tevent_context *ev;
47         struct pthreadpool_tevent *upool;
48         struct pthreadpool_tevent *spool;
49         struct pthreadpool_tevent *opool;
50 };
51
52 static int setup_pthreadpool_tevent(void **state)
53 {
54         struct pthreadpool_tevent_test *t;
55         int ret;
56         size_t max_threads;
57
58         t = talloc_zero(NULL, struct pthreadpool_tevent_test);
59         assert_non_null(t);
60
61         t->ev = tevent_context_init(t);
62         assert_non_null(t->ev);
63
64         ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
65         assert_int_equal(ret, 0);
66
67         max_threads = pthreadpool_tevent_max_threads(t->upool);
68         assert_int_equal(max_threads, UINT_MAX);
69
70         ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
71         assert_int_equal(ret, 0);
72
73         max_threads = pthreadpool_tevent_max_threads(t->opool);
74         assert_int_equal(max_threads, 1);
75
76         ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
77         assert_int_equal(ret, 0);
78
79         max_threads = pthreadpool_tevent_max_threads(t->spool);
80         assert_int_equal(max_threads, 0);
81
82         *state = t;
83
84         return 0;
85 }
86
87 static int teardown_pthreadpool_tevent(void **state)
88 {
89         struct pthreadpool_tevent_test *t = *state;
90
91         TALLOC_FREE(t);
92
93         return 0;
94 }
95
96 int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
97                           void *(*start_routine) (void *), void *arg);
98 int __real_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
99                           void *(*start_routine) (void *),  void *arg);
100
101 int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
102                           void *(*start_routine) (void *), void *arg)
103 {
104         int error;
105
106         error = mock_type(int);
107         if (error != 0) {
108                 return error;
109         }
110
111         return __real_pthread_create(thread, attr, start_routine, arg);
112 }
113
114 static void test_job_threadid(void *ptr)
115 {
116         pthread_t *threadid = ptr;
117
118         *threadid = pthread_self();
119 }
120
121 static int test_create_do(struct tevent_context *ev,
122                           struct pthreadpool_tevent *pool,
123                           bool *executed,
124                           bool *in_main_thread)
125 {
126         struct tevent_req *req;
127         pthread_t zero_thread;
128         pthread_t main_thread;
129         pthread_t worker_thread;
130         bool ok;
131         int ret;
132
133         *executed = false;
134         *in_main_thread = false;
135
136         memset(&zero_thread, 0, sizeof(zero_thread));
137         main_thread = pthread_self();
138         worker_thread = zero_thread;
139
140         req = pthreadpool_tevent_job_send(
141                 ev, ev, pool, test_job_threadid, &worker_thread);
142         if (req == NULL) {
143                 fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
144                 return ENOMEM;
145         }
146
147         ok = tevent_req_poll(req, ev);
148         if (!ok) {
149                 ret = errno;
150                 fprintf(stderr, "tevent_req_poll failed: %s\n",
151                         strerror(ret));
152                 *executed = !pthread_equal(worker_thread, zero_thread);
153                 *in_main_thread = pthread_equal(worker_thread, main_thread);
154                 return ret;
155         }
156
157
158         ret = pthreadpool_tevent_job_recv(req);
159         TALLOC_FREE(req);
160         *executed = !pthread_equal(worker_thread, zero_thread);
161         *in_main_thread = pthread_equal(worker_thread, main_thread);
162         if (ret != 0) {
163                 fprintf(stderr, "tevent_req_recv failed: %s\n",
164                         strerror(ret));
165                 return ret;
166         }
167
168         return 0;
169 }
170
171 static void test_create(void **state)
172 {
173         struct pthreadpool_tevent_test *t = *state;
174         bool executed;
175         bool in_main_thread;
176         int ret;
177
178         /*
179          * When pthreadpool cannot create the first worker thread,
180          * this job will run in the sync fallback in the main thread.
181          */
182         will_return(__wrap_pthread_create, EAGAIN);
183         ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
184         assert_int_equal(ret, EAGAIN);
185         assert_false(executed);
186         assert_false(in_main_thread);
187
188         /*
189          * The sync pool won't trigger pthread_create()
190          * It will be triggered by the one pool.
191          */
192         will_return(__wrap_pthread_create, EAGAIN);
193
194         ret = test_create_do(t->ev, t->spool, &executed, &in_main_thread);
195         assert_int_equal(ret, 0);
196         assert_true(executed);
197         assert_true(in_main_thread);
198
199         ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
200         assert_int_equal(ret, EAGAIN);
201         assert_false(executed);
202         assert_false(in_main_thread);
203
204         /*
205          * When a thread can be created, the job will run in the worker thread.
206          */
207         will_return(__wrap_pthread_create, 0);
208         ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
209         assert_int_equal(ret, 0);
210         assert_true(executed);
211         assert_false(in_main_thread);
212
213         poll(NULL, 0, 10);
214
215         /*
216          * Workerthread will still be active for a second; immediately
217          * running another job will also use the worker thread, even
218          * if a new thread cannot be created.
219          */
220         ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
221         assert_int_equal(ret, 0);
222         assert_true(executed);
223         assert_false(in_main_thread);
224
225         /*
226          * When a thread can be created, the job will run in the worker thread.
227          */
228         will_return(__wrap_pthread_create, 0);
229         ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
230         assert_int_equal(ret, 0);
231         assert_true(executed);
232         assert_false(in_main_thread);
233
234         poll(NULL, 0, 10);
235
236         /*
237          * Workerthread will still be active for a second; immediately
238          * running another job will also use the worker thread, even
239          * if a new thread cannot be created.
240          */
241         ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
242         assert_int_equal(ret, 0);
243         assert_true(executed);
244         assert_false(in_main_thread);
245 }
246
247 static void test_per_thread_cwd_job(void *ptr)
248 {
249         const bool *per_thread_cwd_ptr = ptr;
250         bool per_thread_cwd;
251         char cwdbuf[PATH_MAX] = {0,};
252         char *cwdstr = NULL;
253         int ret;
254
255         /*
256          * This needs to be consistent.
257          */
258         per_thread_cwd = pthreadpool_tevent_current_job_per_thread_cwd();
259         assert_int_equal(per_thread_cwd, *per_thread_cwd_ptr);
260
261         if (!per_thread_cwd) {
262                 return;
263         }
264
265         /*
266          * Check we're not already in "/".
267          */
268         cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
269         assert_non_null(cwdstr);
270         assert_string_not_equal(cwdstr, "/");
271
272         ret = chdir("/");
273         assert_int_equal(ret, 0);
274
275         /*
276          * Check we're in "/" now.
277          */
278         cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
279         assert_non_null(cwdstr);
280         assert_string_equal(cwdstr, "/");
281 }
282
283 static int test_per_thread_cwd_do(struct tevent_context *ev,
284                                   struct pthreadpool_tevent *pool)
285 {
286         struct tevent_req *req;
287         bool per_thread_cwd;
288         bool ok;
289         int ret;
290         per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
291
292         req = pthreadpool_tevent_job_send(
293                 ev, ev, pool, test_per_thread_cwd_job, &per_thread_cwd);
294         if (req == NULL) {
295                 fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
296                 return ENOMEM;
297         }
298
299         ok = tevent_req_poll(req, ev);
300         if (!ok) {
301                 ret = errno;
302                 fprintf(stderr, "tevent_req_poll failed: %s\n",
303                         strerror(ret));
304                 return ret;
305         }
306
307         ret = pthreadpool_tevent_job_recv(req);
308         TALLOC_FREE(req);
309         if (ret != 0) {
310                 fprintf(stderr, "tevent_req_recv failed: %s\n",
311                         strerror(ret));
312                 return ret;
313         }
314
315         return 0;
316 }
317
318 static void test_per_thread_cwd(void **state)
319 {
320         struct pthreadpool_tevent_test *t = *state;
321         int ret;
322         bool per_thread_cwd_u;
323         bool per_thread_cwd_o;
324         bool per_thread_cwd_s;
325         char cwdbuf1[PATH_MAX] = {0,};
326         char *cwdstr1 = NULL;
327         char cwdbuf2[PATH_MAX] = {0,};
328         char *cwdstr2 = NULL;
329
330         /*
331          * The unlimited and one pools
332          * should be consistent.
333          *
334          * We can't enforce this as some constraint
335          * container environments disable unshare()
336          * completely, even just with CLONE_FS.
337          */
338         per_thread_cwd_u = pthreadpool_tevent_per_thread_cwd(t->upool);
339         per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
340         assert_int_equal(per_thread_cwd_u, per_thread_cwd_o);
341
342         /*
343          * The sync pool should never support this.
344          */
345         per_thread_cwd_s = pthreadpool_tevent_per_thread_cwd(t->spool);
346         assert_false(per_thread_cwd_s);
347
348         /*
349          * Check we're not already in "/".
350          */
351         cwdstr1 = getcwd(cwdbuf1, sizeof(cwdbuf1));
352         assert_non_null(cwdstr1);
353         assert_string_not_equal(cwdstr1, "/");
354
355         will_return(__wrap_pthread_create, 0);
356         ret = test_per_thread_cwd_do(t->ev, t->upool);
357         assert_int_equal(ret, 0);
358
359         /*
360          * Check we're still in the same directory.
361          */
362         cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
363         assert_non_null(cwdstr2);
364         assert_string_equal(cwdstr2, cwdstr1);
365
366         will_return(__wrap_pthread_create, 0);
367         ret = test_per_thread_cwd_do(t->ev, t->opool);
368         assert_int_equal(ret, 0);
369
370         /*
371          * Check we're still in the same directory.
372          */
373         cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
374         assert_non_null(cwdstr2);
375         assert_string_equal(cwdstr2, cwdstr1);
376
377         ret = test_per_thread_cwd_do(t->ev, t->spool);
378         assert_int_equal(ret, 0);
379
380         /*
381          * Check we're still in the same directory.
382          */
383         cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
384         assert_non_null(cwdstr2);
385         assert_string_equal(cwdstr2, cwdstr1);
386 }
387
388 struct test_cancel_job {
389         int fdm; /* the main end of socketpair */
390         int fdj; /* the job end of socketpair */
391         bool started;
392         bool canceled;
393         bool orphaned;
394         bool finished;
395         size_t polls;
396         size_t timeouts;
397         int sleep_msec;
398         struct tevent_req *req;
399         bool completed;
400         int ret;
401 };
402
403 static void test_cancel_job_done(struct tevent_req *req);
404
405 static int test_cancel_job_destructor(struct test_cancel_job *job)
406 {
407         ANNOTATE_BENIGN_RACE_SIZED(&job->started,
408                                    sizeof(job->started),
409                                    "protected by pthreadpool_tevent code");
410         if (job->started) {
411                 ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
412                                            sizeof(job->finished),
413                                            "protected by pthreadpool_tevent code");
414                 assert_true(job->finished);
415         }
416
417         ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
418                                    sizeof(job->fdj),
419                                    "protected by pthreadpool_tevent code");
420
421         if (job->fdm != -1) {
422                 close(job->fdm);
423                 job->fdm = -1;
424         }
425         if (job->fdj != -1) {
426                 close(job->fdj);
427                 job->fdj = -1;
428         }
429
430         return 0;
431 }
432
433 static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
434 {
435         struct test_cancel_job *job = NULL;
436
437         job = talloc(mem_ctx, struct test_cancel_job);
438         if (job == NULL) {
439                 return NULL;
440         }
441         *job = (struct test_cancel_job) {
442                 .fdm = -1,
443                 .fdj = -1,
444                 .sleep_msec = 50,
445         };
446
447         talloc_set_destructor(job, test_cancel_job_destructor);
448         return job;
449 }
450
451 static void test_cancel_job_fn(void *ptr)
452 {
453         struct test_cancel_job *job = (struct test_cancel_job *)ptr;
454         int fdj = -1;
455         char c = 0;
456         int ret;
457
458         assert_non_null(job); /* make sure we abort without a job pointer */
459
460         job->started = true;
461         fdj = job->fdj;
462         job->fdj = -1;
463
464         if (!pthreadpool_tevent_current_job_continue()) {
465                 job->canceled = pthreadpool_tevent_current_job_canceled();
466                 job->orphaned = pthreadpool_tevent_current_job_orphaned();
467                 job->finished = true;
468                 close(fdj);
469                 return;
470         }
471
472         /*
473          * Notify that we main thread
474          *
475          * write of 1 byte should always work!
476          */
477         ret = write(fdj, &c, 1);
478         assert_int_equal(ret, 1);
479
480         /*
481          * loop until the job was tried to
482          * be canceled or becomes orphaned.
483          *
484          * If there's some activity on the fd
485          * we directly finish.
486          */
487         do {
488                 struct pollfd pfd = {
489                         .fd = fdj,
490                         .events = POLLIN,
491                 };
492
493                 job->polls += 1;
494
495                 ret = poll(&pfd, 1, job->sleep_msec);
496                 if (ret == 1) {
497                         job->finished = true;
498                         close(fdj);
499                         return;
500                 }
501                 assert_int_equal(ret, 0);
502
503                 job->timeouts += 1;
504
505         } while (pthreadpool_tevent_current_job_continue());
506
507         job->canceled = pthreadpool_tevent_current_job_canceled();
508         job->orphaned = pthreadpool_tevent_current_job_orphaned();
509         job->finished = true;
510         close(fdj);
511 }
512
513 static void test_cancel_job_done(struct tevent_req *req)
514 {
515         struct test_cancel_job *job =
516                 tevent_req_callback_data(req,
517                 struct test_cancel_job);
518
519         job->ret = pthreadpool_tevent_job_recv(job->req);
520         TALLOC_FREE(job->req);
521         job->completed = true;
522 }
523
524 static void test_cancel_job_wait(struct test_cancel_job *job,
525                                  struct tevent_context *ev)
526 {
527         /*
528          * We have to keep looping until
529          * test_cancel_job_done was triggered
530          */
531         while (!job->completed) {
532                 int ret;
533
534                 ret = tevent_loop_once(ev);
535                 assert_int_equal(ret, 0);
536         }
537 }
538
539 struct test_cancel_state {
540         struct test_cancel_job *job1;
541         struct test_cancel_job *job2;
542         struct test_cancel_job *job3;
543         struct test_cancel_job *job4;
544         struct test_cancel_job *job5;
545         struct test_cancel_job *job6;
546         struct test_cancel_job *job7;
547         struct test_cancel_job *job8;
548         struct test_cancel_job *job9;
549 };
550
551 static void test_cancel_job(void **private_data)
552 {
553         struct pthreadpool_tevent_test *t = *private_data;
554         struct tevent_context *ev = t->ev;
555         struct pthreadpool_tevent *pool = t->opool;
556         struct test_cancel_state *state = NULL;
557         int ret;
558         bool ok;
559         int fdpair[2] = { -1, -1 };
560         char c = 0;
561
562         state = talloc_zero(t, struct test_cancel_state);
563         assert_non_null(state);
564         state->job1 = test_cancel_job_create(state);
565         assert_non_null(state->job1);
566         state->job2 = test_cancel_job_create(state);
567         assert_non_null(state->job2);
568         state->job3 = test_cancel_job_create(state);
569         assert_non_null(state->job3);
570
571         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
572         assert_int_equal(ret, 0);
573
574         state->job1->fdm = fdpair[0];
575         state->job1->fdj = fdpair[1];
576
577         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
578
579         will_return(__wrap_pthread_create, 0);
580         state->job1->req = pthreadpool_tevent_job_send(
581                 state->job1, ev, pool, test_cancel_job_fn, state->job1);
582         assert_non_null(state->job1->req);
583         tevent_req_set_callback(state->job1->req,
584                                 test_cancel_job_done,
585                                 state->job1);
586
587         state->job2->req = pthreadpool_tevent_job_send(
588                 state->job2, ev, pool, test_cancel_job_fn, NULL);
589         assert_non_null(state->job2->req);
590         tevent_req_set_callback(state->job2->req,
591                                 test_cancel_job_done,
592                                 state->job2);
593
594         state->job3->req = pthreadpool_tevent_job_send(
595                 state->job3, ev, pool, test_cancel_job_fn, NULL);
596         assert_non_null(state->job3->req);
597         tevent_req_set_callback(state->job3->req,
598                                 test_cancel_job_done,
599                                 state->job3);
600
601         /*
602          * Wait for the job 1 to start.
603          */
604         ret = read(state->job1->fdm, &c, 1);
605         assert_int_equal(ret, 1);
606
607         /*
608          * We cancel job 3 and destroy job2.
609          * Both should never be executed.
610          */
611         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
612         TALLOC_FREE(state->job2->req);
613         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
614         ok = tevent_req_cancel(state->job3->req);
615         assert_true(ok);
616         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
617
618         /*
619          * Job 3 should complete as canceled, while
620          * job 1 is still running.
621          */
622         test_cancel_job_wait(state->job3, ev);
623         assert_int_equal(state->job3->ret, ECANCELED);
624         assert_null(state->job3->req);
625         assert_false(state->job3->started);
626
627         /*
628          * Now job1 is canceled while it's running,
629          * this should let it stop it's loop.
630          */
631         ok = tevent_req_cancel(state->job1->req);
632         assert_false(ok);
633
634         /*
635          * Job 1 completes, It got at least one sleep
636          * timeout loop and has state->job1->canceled set.
637          */
638         test_cancel_job_wait(state->job1, ev);
639         assert_int_equal(state->job1->ret, 0);
640         assert_null(state->job1->req);
641         assert_true(state->job1->started);
642         assert_true(state->job1->finished);
643         assert_true(state->job1->canceled);
644         assert_false(state->job1->orphaned);
645         assert_in_range(state->job1->polls, 1, 100);
646         assert_int_equal(state->job1->timeouts, state->job1->polls);
647
648         /*
649          * Now we create jobs 4 and 5
650          * Both should execute.
651          * Job 4 is orphaned while running by a TALLOC_FREE()
652          * This should stop job 4 and let job 5 start.
653          * We do a "normal" exit in job 5 by creating some activity
654          * on the socketpair.
655          */
656
657         state->job4 = test_cancel_job_create(state);
658         assert_non_null(state->job4);
659
660         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
661         assert_int_equal(ret, 0);
662
663         state->job4->fdm = fdpair[0];
664         state->job4->fdj = fdpair[1];
665
666         state->job4->req = pthreadpool_tevent_job_send(
667                 state->job4, ev, pool, test_cancel_job_fn, state->job4);
668         assert_non_null(state->job4->req);
669         tevent_req_set_callback(state->job4->req,
670                                 test_cancel_job_done,
671                                 state->job4);
672
673         state->job5 = test_cancel_job_create(state);
674         assert_non_null(state->job5);
675
676         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
677         assert_int_equal(ret, 0);
678
679         state->job5->fdm = fdpair[0];
680         state->job5->fdj = fdpair[1];
681
682         state->job5->req = pthreadpool_tevent_job_send(
683                 state->job5, ev, pool, test_cancel_job_fn, state->job5);
684         assert_non_null(state->job5->req);
685         tevent_req_set_callback(state->job5->req,
686                                 test_cancel_job_done,
687                                 state->job5);
688
689         /*
690          * Make sure job 5 can exit as soon as possible.
691          * It will never get a sleep/poll timeout.
692          */
693         ret = write(state->job5->fdm, &c, 1);
694         assert_int_equal(ret, 1);
695
696         /*
697          * Wait for the job 4 to start
698          */
699         ret = read(state->job4->fdm, &c, 1);
700         assert_int_equal(ret, 1);
701
702         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
703
704         /*
705          * destroy the request so that it's marked
706          * as orphaned.
707          */
708         TALLOC_FREE(state->job4->req);
709
710         /*
711          * Job 5 completes, It got no sleep timeout loop.
712          */
713         test_cancel_job_wait(state->job5, ev);
714         assert_int_equal(state->job5->ret, 0);
715         assert_null(state->job5->req);
716         assert_true(state->job5->started);
717         assert_true(state->job5->finished);
718         assert_false(state->job5->canceled);
719         assert_false(state->job5->orphaned);
720         assert_int_equal(state->job5->polls, 1);
721         assert_int_equal(state->job5->timeouts, 0);
722
723         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
724
725         /*
726          * Job 2 is still not executed as we did a TALLOC_FREE()
727          * before is was scheduled.
728          */
729         assert_false(state->job2->completed);
730         assert_false(state->job2->started);
731
732         /*
733          * Job 4 is still wasn't completed as we did a TALLOC_FREE()
734          * while it is was running. but it was started and has
735          * orphaned set
736          */
737         assert_false(state->job4->completed);
738         assert_true(state->job4->started);
739         assert_true(state->job4->finished);
740         assert_false(state->job4->canceled);
741         assert_true(state->job4->orphaned);
742         assert_in_range(state->job4->polls, 1, 100);
743         assert_int_equal(state->job4->timeouts, state->job4->polls);
744
745         /*
746          * Now we create jobs 6
747          * We destroy the pool while it's executing.
748          */
749
750         state->job6 = test_cancel_job_create(state);
751         assert_non_null(state->job6);
752
753         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
754         assert_int_equal(ret, 0);
755
756         state->job6->fdm = fdpair[0];
757         state->job6->fdj = fdpair[1];
758
759         state->job6->req = pthreadpool_tevent_job_send(
760                 state->job6, ev, pool, test_cancel_job_fn, state->job6);
761         assert_non_null(state->job6->req);
762         tevent_req_set_callback(state->job6->req,
763                                 test_cancel_job_done,
764                                 state->job6);
765
766         /*
767          * Wait for the job 6 to start
768          */
769         ret = read(state->job6->fdm, &c, 1);
770         assert_int_equal(ret, 1);
771
772         assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
773
774         /*
775          * destroy the request so that it's marked
776          * as orphaned.
777          */
778         pool = NULL;
779         TALLOC_FREE(t->opool);
780
781         /*
782          * Wait until the job finished.
783          */
784         ret = read(state->job6->fdm, &c, 1);
785         assert_int_equal(ret, 0);
786
787         /*
788          * Job 6 is still dangling arround.
789          *
790          * We need to convince valgrind --tool={drd,helgrind}
791          * that the read above is good enough to be
792          * sure the job is finished and closed the other end of
793          * the socketpair.
794          */
795         ANNOTATE_BENIGN_RACE_SIZED(state->job6,
796                                    sizeof(*state->job6),
797                                    "protected by thread fence");
798         assert_non_null(state->job6->req);
799         assert_true(tevent_req_is_in_progress(state->job6->req));
800         assert_false(state->job6->completed);
801         assert_true(state->job6->started);
802         assert_true(state->job6->finished);
803         assert_false(state->job6->canceled);
804         assert_true(state->job6->orphaned);
805         assert_in_range(state->job6->polls, 1, 100);
806         assert_int_equal(state->job6->timeouts, state->job4->polls);
807
808         TALLOC_FREE(state);
809 }
810
811 struct test_pthreadpool_tevent_wrap_tp_stats {
812         unsigned num_before;
813         unsigned num_after;
814         bool destroyed;
815 };
816
817 struct test_pthreadpool_tevent_wrap_tp_state {
818         struct test_pthreadpool_tevent_wrap_tp_state **selfptr;
819         struct test_pthreadpool_tevent_wrap_tp_stats *stats;
820 };
821
822 static int test_pthreadpool_tevent_wrap_tp_state_destructor(
823         struct test_pthreadpool_tevent_wrap_tp_state *state)
824 {
825         state->stats->destroyed = true;
826         *state->selfptr = NULL;
827
828         return 0;
829 }
830
831 static bool test_pthreadpool_tevent_tp_before(struct pthreadpool_tevent *wrap,
832                                               void *private_data,
833                                               struct pthreadpool_tevent *main,
834                                               const char *location)
835 {
836         struct test_pthreadpool_tevent_wrap_tp_state *state =
837                 talloc_get_type_abort(private_data,
838                 struct test_pthreadpool_tevent_wrap_tp_state);
839
840         state->stats->num_before++;
841
842         return true;
843 }
844
845 static bool test_pthreadpool_tevent_tp_after(struct pthreadpool_tevent *wrap,
846                                              void *private_data,
847                                              struct pthreadpool_tevent *main,
848                                              const char *location)
849 {
850         struct test_pthreadpool_tevent_wrap_tp_state *state =
851                 talloc_get_type_abort(private_data,
852                 struct test_pthreadpool_tevent_wrap_tp_state);
853
854         state->stats->num_after++;
855
856         return true;
857 }
858
859 static const struct pthreadpool_tevent_wrapper_ops test_tp_ops = {
860         .name           = "test_pthreadpool_tevent_tp",
861         .before_job     = test_pthreadpool_tevent_tp_before,
862         .after_job      = test_pthreadpool_tevent_tp_after,
863 };
864
865 static void test_wrap_cancel_job(void **private_data)
866 {
867         struct pthreadpool_tevent_test *t = *private_data;
868         struct tevent_context *ev = t->ev;
869         struct pthreadpool_tevent *poolw1 = NULL;
870         struct test_pthreadpool_tevent_wrap_tp_state *poolw1_state = NULL;
871         struct test_pthreadpool_tevent_wrap_tp_stats poolw1_stats = {
872                 .destroyed = false,
873         };
874         struct pthreadpool_tevent *poolw2 = NULL;
875         struct test_pthreadpool_tevent_wrap_tp_state *poolw2_state = NULL;
876         struct test_pthreadpool_tevent_wrap_tp_stats poolw2_stats = {
877                 .destroyed = false,
878         };
879         size_t max_threads_o;
880         size_t max_threads_w1;
881         size_t max_threads_w2;
882         bool per_thread_cwd_o;
883         bool per_thread_cwd_w1;
884         bool per_thread_cwd_w2;
885         struct test_cancel_state *state = NULL;
886         int ret;
887         bool ok;
888         int fdpair[2] = { -1, -1 };
889         char c = 0;
890
891         max_threads_o = pthreadpool_tevent_max_threads(t->opool);
892         per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
893
894         poolw1 = pthreadpool_tevent_wrapper_create(
895                 t->opool, t, &test_tp_ops, &poolw1_state,
896                 struct test_pthreadpool_tevent_wrap_tp_state);
897         assert_non_null(poolw1);
898         poolw1_state->selfptr = &poolw1_state;
899         ANNOTATE_BENIGN_RACE_SIZED(&poolw1_stats,
900                                    sizeof(poolw1_stats),
901                                    "protected by pthreadpool_tevent code");
902         poolw1_state->stats = &poolw1_stats;
903         talloc_set_destructor(poolw1_state,
904                               test_pthreadpool_tevent_wrap_tp_state_destructor);
905
906         poolw2 = pthreadpool_tevent_wrapper_create(
907                 t->opool, t, &test_tp_ops, &poolw2_state,
908                 struct test_pthreadpool_tevent_wrap_tp_state);
909         assert_non_null(poolw2);
910         poolw2_state->selfptr = &poolw2_state;
911         ANNOTATE_BENIGN_RACE_SIZED(&poolw2_stats,
912                                    sizeof(poolw2_stats),
913                                    "protected by pthreadpool_tevent code");
914         poolw2_state->stats = &poolw2_stats;
915         talloc_set_destructor(poolw2_state,
916                               test_pthreadpool_tevent_wrap_tp_state_destructor);
917
918         assert_false(poolw1_stats.destroyed);
919         assert_int_equal(poolw1_stats.num_before, 0);
920         assert_int_equal(poolw1_stats.num_after, 0);
921         max_threads_w1 = pthreadpool_tevent_max_threads(poolw1);
922         assert_int_equal(max_threads_w1, max_threads_o);
923         per_thread_cwd_w1 = pthreadpool_tevent_per_thread_cwd(poolw1);
924         assert_int_equal(per_thread_cwd_w1, per_thread_cwd_o);
925
926         assert_false(poolw2_stats.destroyed);
927         assert_int_equal(poolw2_stats.num_before, 0);
928         assert_int_equal(poolw2_stats.num_after, 0);
929         max_threads_w2 = pthreadpool_tevent_max_threads(poolw2);
930         assert_int_equal(max_threads_w2, max_threads_o);
931         per_thread_cwd_w2 = pthreadpool_tevent_per_thread_cwd(poolw2);
932         assert_int_equal(per_thread_cwd_w2, per_thread_cwd_o);
933
934         state = talloc_zero(t, struct test_cancel_state);
935         assert_non_null(state);
936         state->job1 = test_cancel_job_create(state);
937         assert_non_null(state->job1);
938         state->job2 = test_cancel_job_create(state);
939         assert_non_null(state->job2);
940         state->job3 = test_cancel_job_create(state);
941         assert_non_null(state->job3);
942
943         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
944         assert_int_equal(ret, 0);
945
946         state->job1->fdm = fdpair[0];
947         state->job1->fdj = fdpair[1];
948
949         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
950         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
951         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
952
953         will_return(__wrap_pthread_create, 0);
954         state->job1->req = pthreadpool_tevent_job_send(
955                 state->job1, ev, poolw1, test_cancel_job_fn, state->job1);
956         assert_non_null(state->job1->req);
957         tevent_req_set_callback(state->job1->req,
958                                 test_cancel_job_done,
959                                 state->job1);
960
961         state->job2->req = pthreadpool_tevent_job_send(
962                 state->job2, ev, poolw1, test_cancel_job_fn, NULL);
963         assert_non_null(state->job2->req);
964         tevent_req_set_callback(state->job2->req,
965                                 test_cancel_job_done,
966                                 state->job2);
967
968         state->job3->req = pthreadpool_tevent_job_send(
969                 state->job3, ev, poolw1, test_cancel_job_fn, NULL);
970         assert_non_null(state->job3->req);
971         tevent_req_set_callback(state->job3->req,
972                                 test_cancel_job_done,
973                                 state->job3);
974
975         /*
976          * Wait for the job 1 to start.
977          */
978         ret = read(state->job1->fdm, &c, 1);
979         assert_int_equal(ret, 1);
980
981         /*
982          * We cancel job 3 and destroy job2.
983          * Both should never be executed.
984          */
985         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 2);
986         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 2);
987         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 2);
988         TALLOC_FREE(state->job2->req);
989         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
990         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
991         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
992         ok = tevent_req_cancel(state->job3->req);
993         assert_true(ok);
994         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
995         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
996         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
997
998         /*
999          * Job 3 should complete as canceled, while
1000          * job 1 is still running.
1001          */
1002         test_cancel_job_wait(state->job3, ev);
1003         assert_int_equal(state->job3->ret, ECANCELED);
1004         assert_null(state->job3->req);
1005         assert_false(state->job3->started);
1006
1007         /*
1008          * Now job1 is canceled while it's running,
1009          * this should let it stop it's loop.
1010          */
1011         ok = tevent_req_cancel(state->job1->req);
1012         assert_false(ok);
1013
1014         /*
1015          * Job 1 completes, It got at least one sleep
1016          * timeout loop and has state->job1->canceled set.
1017          */
1018         test_cancel_job_wait(state->job1, ev);
1019         assert_int_equal(state->job1->ret, 0);
1020         assert_null(state->job1->req);
1021         assert_true(state->job1->started);
1022         assert_true(state->job1->finished);
1023         assert_true(state->job1->canceled);
1024         assert_false(state->job1->orphaned);
1025         assert_in_range(state->job1->polls, 1, 100);
1026         assert_int_equal(state->job1->timeouts, state->job1->polls);
1027
1028         assert_false(poolw1_stats.destroyed);
1029         assert_int_equal(poolw1_stats.num_before, 1);
1030         assert_int_equal(poolw1_stats.num_after, 1);
1031
1032         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
1033         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
1034         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
1035
1036         /*
1037          * Now we create jobs 4 and 5
1038          * Both should execute.
1039          * Job 4 is orphaned while running by a TALLOC_FREE()
1040          * This should stop job 4 and let job 5 start.
1041          * We do a "normal" exit in job 5 by creating some activity
1042          * on the socketpair.
1043          */
1044
1045         state->job4 = test_cancel_job_create(state);
1046         assert_non_null(state->job4);
1047
1048         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
1049         assert_int_equal(ret, 0);
1050
1051         state->job4->fdm = fdpair[0];
1052         state->job4->fdj = fdpair[1];
1053
1054         state->job4->req = pthreadpool_tevent_job_send(
1055                 state->job4, ev, poolw1, test_cancel_job_fn, state->job4);
1056         assert_non_null(state->job4->req);
1057         tevent_req_set_callback(state->job4->req,
1058                                 test_cancel_job_done,
1059                                 state->job4);
1060
1061         state->job5 = test_cancel_job_create(state);
1062         assert_non_null(state->job5);
1063
1064         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
1065         assert_int_equal(ret, 0);
1066
1067         state->job5->fdm = fdpair[0];
1068         state->job5->fdj = fdpair[1];
1069
1070         state->job5->req = pthreadpool_tevent_job_send(
1071                 state->job5, ev, poolw1, test_cancel_job_fn, state->job5);
1072         assert_non_null(state->job5->req);
1073         tevent_req_set_callback(state->job5->req,
1074                                 test_cancel_job_done,
1075                                 state->job5);
1076
1077         /*
1078          * Make sure job 5 can exit as soon as possible.
1079          * It will never get a sleep/poll timeout.
1080          */
1081         ret = write(state->job5->fdm, &c, 1);
1082         assert_int_equal(ret, 1);
1083
1084         /*
1085          * Wait for the job 4 to start
1086          */
1087         ret = read(state->job4->fdm, &c, 1);
1088         assert_int_equal(ret, 1);
1089
1090         assert_false(poolw1_stats.destroyed);
1091         assert_int_equal(poolw1_stats.num_before, 2);
1092         assert_int_equal(poolw1_stats.num_after, 1);
1093
1094         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
1095         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
1096         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
1097
1098         /*
1099          * destroy the request so that it's marked
1100          * as orphaned.
1101          *
1102          * As we're in the wrapper mode, the
1103          * job thread will exit and try to create
1104          * a new thread.
1105          */
1106         TALLOC_FREE(state->job4->req);
1107
1108         /*
1109          * Job 5 completes, It got no sleep timeout loop.
1110          */
1111         will_return(__wrap_pthread_create, 0);
1112         test_cancel_job_wait(state->job5, ev);
1113         assert_int_equal(state->job5->ret, 0);
1114         assert_null(state->job5->req);
1115         assert_true(state->job5->started);
1116         assert_true(state->job5->finished);
1117         assert_false(state->job5->canceled);
1118         assert_false(state->job5->orphaned);
1119         assert_int_equal(state->job5->polls, 1);
1120         assert_int_equal(state->job5->timeouts, 0);
1121
1122         assert_false(poolw1_stats.destroyed);
1123         assert_int_equal(poolw1_stats.num_before, 3);
1124         assert_int_equal(poolw1_stats.num_after, 2);
1125
1126         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
1127         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
1128         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
1129
1130         /*
1131          * Job 2 is still not executed as we did a TALLOC_FREE()
1132          * before is was scheduled.
1133          */
1134         assert_false(state->job2->completed);
1135         assert_false(state->job2->started);
1136
1137         /*
1138          * Job 4 is still wasn't completed as we did a TALLOC_FREE()
1139          * while it is was running. but it was started and has
1140          * orphaned set
1141          */
1142         assert_false(state->job4->completed);
1143         assert_true(state->job4->started);
1144         assert_true(state->job4->finished);
1145         assert_false(state->job4->canceled);
1146         assert_true(state->job4->orphaned);
1147         assert_in_range(state->job4->polls, 1, 100);
1148         assert_int_equal(state->job4->timeouts, state->job4->polls);
1149
1150         assert_false(poolw1_stats.destroyed);
1151         assert_int_equal(poolw1_stats.num_before, 3);
1152         assert_int_equal(poolw1_stats.num_after, 2);
1153
1154         /*
1155          * Now we create jobs 6 and 7
1156          * Both should execute.
1157          * We destroy the pool wrapper (1) while job 6 is executing.
1158          * This should stop job 6 and let job 7 start.
1159          * Job 7 runs on the pool wrapper (2).
1160          * We do a "normal" exit in job 7 by creating some activity
1161          * on the socketpair.
1162          */
1163
1164         state->job6 = test_cancel_job_create(state);
1165         assert_non_null(state->job6);
1166
1167         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
1168         assert_int_equal(ret, 0);
1169
1170         state->job6->fdm = fdpair[0];
1171         state->job6->fdj = fdpair[1];
1172
1173         state->job6->req = pthreadpool_tevent_job_send(
1174                 state->job6, ev, poolw1, test_cancel_job_fn, state->job6);
1175         assert_non_null(state->job6->req);
1176         tevent_req_set_callback(state->job6->req,
1177                                 test_cancel_job_done,
1178                                 state->job6);
1179
1180         state->job7 = test_cancel_job_create(state);
1181         assert_non_null(state->job7);
1182
1183         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
1184         assert_int_equal(ret, 0);
1185
1186         state->job7->fdm = fdpair[0];
1187         state->job7->fdj = fdpair[1];
1188
1189         state->job7->req = pthreadpool_tevent_job_send(
1190                 state->job7, ev, poolw2, test_cancel_job_fn, state->job7);
1191         assert_non_null(state->job7->req);
1192         tevent_req_set_callback(state->job7->req,
1193                                 test_cancel_job_done,
1194                                 state->job7);
1195
1196         /*
1197          * Make sure job 7 can exit as soon as possible.
1198          * It will never get a sleep/poll timeout.
1199          */
1200         ret = write(state->job7->fdm, &c, 1);
1201         assert_int_equal(ret, 1);
1202
1203         /*
1204          * Wait for the job 6 to start
1205          */
1206         ret = read(state->job6->fdm, &c, 1);
1207         assert_int_equal(ret, 1);
1208
1209         assert_non_null(poolw1_state);
1210         assert_false(poolw1_stats.destroyed);
1211         assert_int_equal(poolw1_stats.num_before, 4);
1212         assert_int_equal(poolw1_stats.num_after, 2);
1213
1214         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
1215         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
1216         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
1217
1218         /*
1219          * destroy the request so that it's marked
1220          * as orphaned.
1221          */
1222         TALLOC_FREE(poolw1);
1223
1224         /*
1225          * Wait until the job finished.
1226          */
1227         ret = read(state->job6->fdm, &c, 1);
1228         assert_int_equal(ret, 0);
1229
1230         assert_null(poolw1_state);
1231         assert_true(poolw1_stats.destroyed);
1232         assert_int_equal(poolw1_stats.num_before, 4);
1233         assert_int_equal(poolw1_stats.num_after, 2);
1234
1235         /*
1236          * Job 6 is still dangling arround.
1237          *
1238          * We need to convince valgrind --tool={drd,helgrind}
1239          * that the read above is good enough to be
1240          * sure the job is finished and closed the other end of
1241          * the socketpair.
1242          */
1243         ANNOTATE_BENIGN_RACE_SIZED(state->job6,
1244                                    sizeof(*state->job6),
1245                                    "protected by thread fence");
1246         assert_non_null(state->job6->req);
1247         assert_true(tevent_req_is_in_progress(state->job6->req));
1248         assert_false(state->job6->completed);
1249         assert_true(state->job6->started);
1250         assert_true(state->job6->finished);
1251         assert_false(state->job6->canceled);
1252         assert_true(state->job6->orphaned);
1253         assert_in_range(state->job6->polls, 1, 100);
1254         assert_int_equal(state->job6->timeouts, state->job4->polls);
1255
1256         /*
1257          * Job 7 completes, It got no sleep timeout loop.
1258          */
1259         will_return(__wrap_pthread_create, 0);
1260         test_cancel_job_wait(state->job7, ev);
1261         assert_int_equal(state->job7->ret, 0);
1262         assert_null(state->job7->req);
1263         assert_true(state->job7->started);
1264         assert_true(state->job7->finished);
1265         assert_false(state->job7->canceled);
1266         assert_false(state->job7->orphaned);
1267         assert_int_equal(state->job7->polls, 1);
1268         assert_int_equal(state->job7->timeouts, 0);
1269
1270         assert_non_null(poolw2_state);
1271         assert_false(poolw2_stats.destroyed);
1272         assert_int_equal(poolw2_stats.num_before, 1);
1273         assert_int_equal(poolw2_stats.num_after, 1);
1274
1275         assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
1276         assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
1277
1278         /*
1279          * Now we create jobs 8
1280          * On a new wrapper pool
1281          * We destroy the main pool while it's executing.
1282          */
1283
1284         state->job8 = test_cancel_job_create(state);
1285         assert_non_null(state->job8);
1286
1287         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
1288         assert_int_equal(ret, 0);
1289
1290         state->job8->fdm = fdpair[0];
1291         state->job8->fdj = fdpair[1];
1292
1293         state->job8->req = pthreadpool_tevent_job_send(
1294                 state->job8, ev, poolw2, test_cancel_job_fn, state->job8);
1295         assert_non_null(state->job8->req);
1296         tevent_req_set_callback(state->job8->req,
1297                                 test_cancel_job_done,
1298                                 state->job8);
1299
1300         /*
1301          * Wait for the job 8 to start
1302          */
1303         ret = read(state->job8->fdm, &c, 1);
1304         assert_int_equal(ret, 1);
1305
1306         assert_false(poolw2_stats.destroyed);
1307         assert_int_equal(poolw2_stats.num_before, 2);
1308         assert_int_equal(poolw2_stats.num_after, 1);
1309
1310         /*
1311          * destroy the request so that it's marked
1312          * as orphaned.
1313          */
1314         TALLOC_FREE(t->opool);
1315
1316         /*
1317          * Wait until the job finished.
1318          */
1319         ret = read(state->job8->fdm, &c, 1);
1320         assert_int_equal(ret, 0);
1321
1322         assert_null(poolw2_state);
1323         assert_true(poolw2_stats.destroyed);
1324         assert_int_equal(poolw2_stats.num_before, 2);
1325         assert_int_equal(poolw2_stats.num_after, 1);
1326
1327         /*
1328          * Job 8 is still dangling arround.
1329          *
1330          * We need to convince valgrind --tool={drd,helgrind}
1331          * that the read above is good enough to be
1332          * sure the job is finished and closed the other end of
1333          * the socketpair.
1334          */
1335         ANNOTATE_BENIGN_RACE_SIZED(state->job8,
1336                                    sizeof(*state->job8),
1337                                    "protected by thread fence");
1338         assert_non_null(state->job8->req);
1339         assert_true(tevent_req_is_in_progress(state->job8->req));
1340         assert_false(state->job8->completed);
1341         assert_true(state->job8->started);
1342         assert_true(state->job8->finished);
1343         assert_false(state->job8->canceled);
1344         assert_true(state->job8->orphaned);
1345         assert_in_range(state->job8->polls, 1, 100);
1346         assert_int_equal(state->job8->timeouts, state->job4->polls);
1347
1348         /*
1349          * Now check that adding a new job to the dangling
1350          * wrapper gives an error.
1351          */
1352         state->job9 = test_cancel_job_create(state);
1353         assert_non_null(state->job9);
1354
1355         state->job9->req = pthreadpool_tevent_job_send(
1356                 state->job9, ev, poolw2, test_cancel_job_fn, state->job9);
1357         assert_non_null(state->job9->req);
1358         tevent_req_set_callback(state->job9->req,
1359                                 test_cancel_job_done,
1360                                 state->job9);
1361
1362         /*
1363          * Job 9 completes, But with a failure.
1364          */
1365         test_cancel_job_wait(state->job9, ev);
1366         assert_int_equal(state->job9->ret, EINVAL);
1367         assert_null(state->job9->req);
1368         assert_false(state->job9->started);
1369         assert_false(state->job9->finished);
1370         assert_false(state->job9->canceled);
1371         assert_false(state->job9->orphaned);
1372         assert_int_equal(state->job9->polls, 0);
1373         assert_int_equal(state->job9->timeouts, 0);
1374
1375         TALLOC_FREE(state);
1376 }
1377
1378 int main(int argc, char **argv)
1379 {
1380         const struct CMUnitTest tests[] = {
1381                 cmocka_unit_test_setup_teardown(test_create,
1382                                                 setup_pthreadpool_tevent,
1383                                                 teardown_pthreadpool_tevent),
1384                 cmocka_unit_test_setup_teardown(test_per_thread_cwd,
1385                                                 setup_pthreadpool_tevent,
1386                                                 teardown_pthreadpool_tevent),
1387                 cmocka_unit_test_setup_teardown(test_cancel_job,
1388                                                 setup_pthreadpool_tevent,
1389                                                 teardown_pthreadpool_tevent),
1390                 cmocka_unit_test_setup_teardown(test_wrap_cancel_job,
1391                                                 setup_pthreadpool_tevent,
1392                                                 teardown_pthreadpool_tevent),
1393         };
1394
1395         cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
1396
1397         return cmocka_run_group_tests(tests, NULL, NULL);
1398 }