pthreadpool: Simplify the logic in add_job a bit
[samba.git] / lib / pthreadpool / tests.c
1 #include <stdio.h>
2 #include <string.h>
3 #include <poll.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/wait.h>
10 #include <signal.h>
11 #include "pthreadpool_pipe.h"
12 #include "pthreadpool_tevent.h"
13
14 static int test_init(void)
15 {
16         struct pthreadpool_pipe *p;
17         int ret;
18
19         ret = pthreadpool_pipe_init(1, &p);
20         if (ret != 0) {
21                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
22                         strerror(ret));
23                 return -1;
24         }
25         ret = pthreadpool_pipe_destroy(p);
26         if (ret != 0) {
27                 fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
28                         strerror(ret));
29                 return -1;
30         }
31         return 0;
32 }
33
34 static void test_sleep(void *ptr)
35 {
36         int *ptimeout = (int *)ptr;
37         int ret;
38         ret = poll(NULL, 0, *ptimeout);
39         if (ret != 0) {
40                 fprintf(stderr, "poll returned %d (%s)\n",
41                         ret, strerror(errno));
42         }
43 }
44
45 static int test_jobs(int num_threads, int num_jobs)
46 {
47         char *finished;
48         struct pthreadpool_pipe *p;
49         int timeout = 1;
50         int i, ret;
51
52         finished = (char *)calloc(1, num_jobs);
53         if (finished == NULL) {
54                 fprintf(stderr, "calloc failed\n");
55                 return -1;
56         }
57
58         ret = pthreadpool_pipe_init(num_threads, &p);
59         if (ret != 0) {
60                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
61                         strerror(ret));
62                 return -1;
63         }
64
65         for (i=0; i<num_jobs; i++) {
66                 ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
67                 if (ret != 0) {
68                         fprintf(stderr, "pthreadpool_pipe_add_job failed: "
69                                 "%s\n", strerror(ret));
70                         return -1;
71                 }
72         }
73
74         for (i=0; i<num_jobs; i++) {
75                 int jobid = -1;
76                 ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
77                 if (ret < 0) {
78                         fprintf(stderr, "pthreadpool_pipe_finished_jobs "
79                                 "failed: %s\n", strerror(-ret));
80                         return -1;
81                 }
82                 if ((ret != 1) || (jobid >= num_jobs)) {
83                         fprintf(stderr, "invalid job number %d\n", jobid);
84                         return -1;
85                 }
86                 finished[jobid] += 1;
87         }
88
89         for (i=0; i<num_jobs; i++) {
90                 if (finished[i] != 1) {
91                         fprintf(stderr, "finished[%d] = %d\n",
92                                 i, finished[i]);
93                         return -1;
94                 }
95         }
96
97         ret = pthreadpool_pipe_destroy(p);
98         if (ret != 0) {
99                 fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
100                         strerror(ret));
101                 return -1;
102         }
103
104         free(finished);
105         return 0;
106 }
107
108 static int test_busydestroy(void)
109 {
110         struct pthreadpool_pipe *p;
111         int timeout = 50;
112         struct pollfd pfd;
113         int ret, jobid;
114
115         ret = pthreadpool_pipe_init(1, &p);
116         if (ret != 0) {
117                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
118                         strerror(ret));
119                 return -1;
120         }
121         ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
122         if (ret != 0) {
123                 fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
124                         strerror(ret));
125                 return -1;
126         }
127         ret = pthreadpool_pipe_destroy(p);
128         if (ret != EBUSY) {
129                 fprintf(stderr, "Could destroy a busy pool\n");
130                 return -1;
131         }
132
133         pfd.fd = pthreadpool_pipe_signal_fd(p);
134         pfd.events = POLLIN|POLLERR;
135
136         do {
137                 ret = poll(&pfd, 1, -1);
138         } while ((ret == -1) && (errno == EINTR));
139
140         ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
141         if (ret < 0) {
142                 fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
143                         strerror(-ret));
144                 return -1;
145         }
146
147         ret = pthreadpool_pipe_destroy(p);
148         if (ret != 0) {
149                 fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
150                         strerror(ret));
151                 return -1;
152         }
153         return 0;
154 }
155
156 static int test_fork(void)
157 {
158         struct pthreadpool_pipe *p;
159         pid_t child, waited;
160         int status, ret;
161
162         ret = pthreadpool_pipe_init(1, &p);
163         if (ret != 0) {
164                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
165                         strerror(ret));
166                 return -1;
167         }
168         ret = pthreadpool_pipe_destroy(p);
169         if (ret != 0) {
170                 fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
171                         strerror(ret));
172                 return -1;
173         }
174
175         child = fork();
176         if (child < 0) {
177                 perror("fork failed");
178                 return -1;
179         }
180         if (child == 0) {
181                 exit(0);
182         }
183         waited = wait(&status);
184         if (waited == -1) {
185                 perror("wait failed");
186                 return -1;
187         }
188         if (waited != child) {
189                 fprintf(stderr, "expected child %d, got %d\n",
190                         (int)child, (int)waited);
191                 return -1;
192         }
193         return 0;
194 }
195
196 static void busyfork_job(void *private_data)
197 {
198         return;
199 }
200
201 static int test_busyfork(void)
202 {
203         struct pthreadpool_pipe *p;
204         int fds[2];
205         struct pollfd pfd;
206         pid_t child, waitret;
207         int ret, jobnum, wstatus;
208
209         ret = pipe(fds);
210         if (ret == -1) {
211                 perror("pipe failed");
212                 return -1;
213         }
214
215         ret = pthreadpool_pipe_init(1, &p);
216         if (ret != 0) {
217                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
218                         strerror(ret));
219                 return -1;
220         }
221
222         ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
223         if (ret != 0) {
224                 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
225                         strerror(ret));
226                 return -1;
227         }
228
229         ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1);
230         if (ret != 1) {
231                 fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n");
232                 return -1;
233         }
234
235         ret = poll(NULL, 0, 200);
236         if (ret == -1) {
237                 perror("poll failed");
238                 return -1;
239         }
240
241         child = fork();
242         if (child < 0) {
243                 perror("fork failed");
244                 return -1;
245         }
246
247         if (child == 0) {
248                 ret = pthreadpool_pipe_destroy(p);
249                 if (ret != 0) {
250                         fprintf(stderr, "pthreadpool_pipe_destroy failed: "
251                                 "%s\n", strerror(ret));
252                         exit(1);
253                 }
254                 exit(0);
255         }
256
257         ret = close(fds[1]);
258         if (ret == -1) {
259                 perror("close failed");
260                 return -1;
261         }
262
263         pfd = (struct pollfd) { .fd = fds[0], .events = POLLIN };
264
265         ret = poll(&pfd, 1, 5000);
266         if (ret == -1) {
267                 perror("poll failed");
268                 return -1;
269         }
270         if (ret == 0) {
271                 fprintf(stderr, "Child did not exit for 5 seconds\n");
272                 /*
273                  * The child might hang forever in
274                  * pthread_cond_destroy for example. Be kind to the
275                  * system and kill it.
276                  */
277                 kill(child, SIGTERM);
278                 return -1;
279         }
280         if (ret != 1) {
281                 fprintf(stderr, "poll returned %d -- huh??\n", ret);
282                 return -1;
283         }
284
285         ret = poll(NULL, 0, 200);
286         if (ret == -1) {
287                 perror("poll failed");
288                 return -1;
289         }
290
291         waitret = waitpid(child, &wstatus, WNOHANG);
292         if (waitret != child) {
293                 fprintf(stderr, "waitpid returned %d\n", (int)waitret);
294                 return -1;
295         }
296
297         if (!WIFEXITED(wstatus)) {
298                 fprintf(stderr, "child did not properly exit\n");
299                 return -1;
300         }
301
302         ret = WEXITSTATUS(wstatus);
303         if (ret != 0) {
304                 fprintf(stderr, "child returned %d\n", ret);
305                 return -1;
306         }
307
308         return 0;
309 }
310
311 static int test_busyfork2(void)
312 {
313         struct pthreadpool_pipe *p;
314         pid_t child;
315         int ret, jobnum;
316         struct pollfd pfd;
317
318         ret = pthreadpool_pipe_init(1, &p);
319         if (ret != 0) {
320                 fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
321                         strerror(ret));
322                 return -1;
323         }
324
325         ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
326         if (ret != 0) {
327                 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
328                         strerror(ret));
329                 return -1;
330         }
331
332         ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1);
333         if (ret != 1) {
334                 fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n");
335                 return -1;
336         }
337
338         ret = poll(NULL, 0, 10);
339         if (ret == -1) {
340                 perror("poll failed");
341                 return -1;
342         }
343
344         ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
345         if (ret != 0) {
346                 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
347                         strerror(ret));
348                 return -1;
349         }
350
351         /*
352          * Do the fork right after the add_job. This tests a race
353          * where the atfork prepare handler gets all idle threads off
354          * the condvar. If we are faster doing the fork than the
355          * existing idle thread could get out of idle and take the
356          * job, after the fork we end up with no threads to take care
357          * of the job.
358          */
359
360         child = fork();
361         if (child < 0) {
362                 perror("fork failed");
363                 return -1;
364         }
365
366         if (child == 0) {
367                 exit(0);
368         }
369
370         pfd = (struct pollfd) {
371                 .fd = pthreadpool_pipe_signal_fd(p),
372                 .events = POLLIN|POLLERR
373         };
374
375         do {
376                 ret = poll(&pfd, 1, 5000);
377         } while ((ret == -1) && (errno == EINTR));
378
379         if (ret == 0) {
380                 fprintf(stderr, "job unfinished after 5 seconds\n");
381                 return -1;
382         }
383
384         return 0;
385 }
386
387 static void test_tevent_wait(void *private_data)
388 {
389         int *timeout = private_data;
390         poll(NULL, 0, *timeout);
391 }
392
393 static int test_tevent_1(void)
394 {
395         struct tevent_context *ev;
396         struct pthreadpool_tevent *pool;
397         struct tevent_req *req1, *req2;
398         int timeout10 = 10;
399         int timeout100 = 100;
400         int ret;
401         bool ok;
402
403         ev = tevent_context_init(NULL);
404         if (ev == NULL) {
405                 ret = errno;
406                 fprintf(stderr, "tevent_context_init failed: %s\n",
407                         strerror(ret));
408                 return ret;
409         }
410         ret = pthreadpool_tevent_init(ev, 0, &pool);
411         if (ret != 0) {
412                 fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
413                         strerror(ret));
414                 TALLOC_FREE(ev);
415                 return ret;
416         }
417         req1 = pthreadpool_tevent_job_send(
418                 ev, ev, pool, test_tevent_wait, &timeout10);
419         if (req1 == NULL) {
420                 fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
421                 TALLOC_FREE(ev);
422                 return ENOMEM;
423         }
424         req2 = pthreadpool_tevent_job_send(
425                 ev, ev, pool, test_tevent_wait, &timeout100);
426         if (req2 == NULL) {
427                 fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
428                 TALLOC_FREE(ev);
429                 return ENOMEM;
430         }
431         ok = tevent_req_poll(req2, ev);
432         if (!ok) {
433                 ret = errno;
434                 fprintf(stderr, "tevent_req_poll failed: %s\n",
435                         strerror(ret));
436                 TALLOC_FREE(ev);
437                 return ret;
438         }
439         ret = pthreadpool_tevent_job_recv(req1);
440         TALLOC_FREE(req1);
441         if (ret != 0) {
442                 fprintf(stderr, "tevent_req_poll failed: %s\n",
443                         strerror(ret));
444                 TALLOC_FREE(ev);
445                 return ret;
446         }
447
448         TALLOC_FREE(req2);
449
450         ret = tevent_loop_wait(ev);
451         if (ret != 0) {
452                 fprintf(stderr, "tevent_loop_wait failed\n");
453                 return ret;
454         }
455
456         TALLOC_FREE(pool);
457         TALLOC_FREE(ev);
458         return 0;
459 }
460
461 int main(void)
462 {
463         int ret;
464
465         ret = test_tevent_1();
466         if (ret != 0) {
467                 fprintf(stderr, "test_event_1 failed: %s\n",
468                         strerror(ret));
469                 return 1;
470         }
471
472         ret = test_init();
473         if (ret != 0) {
474                 fprintf(stderr, "test_init failed\n");
475                 return 1;
476         }
477
478         ret = test_fork();
479         if (ret != 0) {
480                 fprintf(stderr, "test_fork failed\n");
481                 return 1;
482         }
483
484         ret = test_jobs(10, 10000);
485         if (ret != 0) {
486                 fprintf(stderr, "test_jobs failed\n");
487                 return 1;
488         }
489
490         ret = test_busydestroy();
491         if (ret != 0) {
492                 fprintf(stderr, "test_busydestroy failed\n");
493                 return 1;
494         }
495
496         ret = test_busyfork();
497         if (ret != 0) {
498                 fprintf(stderr, "test_busyfork failed\n");
499                 return 1;
500         }
501
502         ret = test_busyfork2();
503         if (ret != 0) {
504                 fprintf(stderr, "test_busyfork2 failed\n");
505                 return 1;
506         }
507
508         printf("success\n");
509         return 0;
510 }