Release alpha15.
[kai/samba-autobuild/.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include <errno.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include <sys/time.h>
30
31 #include "pthreadpool.h"
32 #include "lib/util/dlinklist.h"
33
34 struct pthreadpool_job {
35         struct pthreadpool_job *next;
36         int id;
37         void (*fn)(void *private_data);
38         void *private_data;
39 };
40
41 struct pthreadpool {
42         /*
43          * List pthreadpools for fork safety
44          */
45         struct pthreadpool *prev, *next;
46
47         /*
48          * Control access to this struct
49          */
50         pthread_mutex_t mutex;
51
52         /*
53          * Threads waiting for work do so here
54          */
55         pthread_cond_t condvar;
56
57         /*
58          * List of work jobs
59          */
60         struct pthreadpool_job *jobs, *last_job;
61
62         /*
63          * pipe for signalling
64          */
65         int sig_pipe[2];
66
67         /*
68          * indicator to worker threads that they should shut down
69          */
70         int shutdown;
71
72         /*
73          * maximum number of threads
74          */
75         int max_threads;
76
77         /*
78          * Number of threads
79          */
80         int num_threads;
81
82         /*
83          * Number of idle threads
84          */
85         int num_idle;
86
87         /*
88          * An array of threads that require joining.
89          */
90         int                     num_exited;
91         pthread_t               *exited; /* We alloc more */
92 };
93
94 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
95 static struct pthreadpool *pthreadpools = NULL;
96 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
97
98 static void pthreadpool_prep_atfork(void);
99
100 /*
101  * Initialize a thread pool
102  */
103
104 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
105 {
106         struct pthreadpool *pool;
107         int ret;
108
109         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
110         if (pool == NULL) {
111                 return ENOMEM;
112         }
113
114         ret = pipe(pool->sig_pipe);
115         if (ret == -1) {
116                 int err = errno;
117                 free(pool);
118                 return err;
119         }
120
121         ret = pthread_mutex_init(&pool->mutex, NULL);
122         if (ret != 0) {
123                 close(pool->sig_pipe[0]);
124                 close(pool->sig_pipe[1]);
125                 free(pool);
126                 return ret;
127         }
128
129         ret = pthread_cond_init(&pool->condvar, NULL);
130         if (ret != 0) {
131                 pthread_mutex_destroy(&pool->mutex);
132                 close(pool->sig_pipe[0]);
133                 close(pool->sig_pipe[1]);
134                 free(pool);
135                 return ret;
136         }
137
138         pool->shutdown = 0;
139         pool->jobs = pool->last_job = NULL;
140         pool->num_threads = 0;
141         pool->num_exited = 0;
142         pool->exited = NULL;
143         pool->max_threads = max_threads;
144         pool->num_idle = 0;
145
146         ret = pthread_mutex_lock(&pthreadpools_mutex);
147         if (ret != 0) {
148                 pthread_cond_destroy(&pool->condvar);
149                 pthread_mutex_destroy(&pool->mutex);
150                 close(pool->sig_pipe[0]);
151                 close(pool->sig_pipe[1]);
152                 free(pool);
153                 return ret;
154         }
155         DLIST_ADD(pthreadpools, pool);
156
157         ret = pthread_mutex_unlock(&pthreadpools_mutex);
158         assert(ret == 0);
159
160         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
161
162         *presult = pool;
163
164         return 0;
165 }
166
167 static void pthreadpool_prepare(void)
168 {
169         int ret;
170         struct pthreadpool *pool;
171
172         ret = pthread_mutex_lock(&pthreadpools_mutex);
173         assert(ret == 0);
174
175         pool = pthreadpools;
176
177         while (pool != NULL) {
178                 ret = pthread_mutex_lock(&pool->mutex);
179                 assert(ret == 0);
180                 pool = pool->next;
181         }
182 }
183
184 static void pthreadpool_parent(void)
185 {
186         int ret;
187         struct pthreadpool *pool;
188
189         pool = DLIST_TAIL(pthreadpools);
190
191         while (1) {
192                 ret = pthread_mutex_unlock(&pool->mutex);
193                 assert(ret == 0);
194
195                 if (pool == pthreadpools) {
196                         break;
197                 }
198                 pool = pool->prev;
199         }
200
201         ret = pthread_mutex_unlock(&pthreadpools_mutex);
202         assert(ret == 0);
203 }
204
205 static void pthreadpool_child(void)
206 {
207         int ret;
208         struct pthreadpool *pool;
209
210         pool = DLIST_TAIL(pthreadpools);
211
212         while (1) {
213                 close(pool->sig_pipe[0]);
214                 close(pool->sig_pipe[1]);
215
216                 ret = pipe(pool->sig_pipe);
217                 assert(ret == 0);
218
219                 pool->num_threads = 0;
220
221                 pool->num_exited = 0;
222                 free(pool->exited);
223                 pool->exited = NULL;
224
225                 pool->num_idle = 0;
226
227                 while (pool->jobs != NULL) {
228                         struct pthreadpool_job *job;
229                         job = pool->jobs;
230                         pool->jobs = job->next;
231                         free(job);
232                 }
233                 pool->last_job = NULL;
234
235                 ret = pthread_mutex_unlock(&pool->mutex);
236                 assert(ret == 0);
237
238                 if (pool == pthreadpools) {
239                         break;
240                 }
241                 pool = pool->prev;
242         }
243
244         ret = pthread_mutex_unlock(&pthreadpools_mutex);
245         assert(ret == 0);
246 }
247
248 static void pthreadpool_prep_atfork(void)
249 {
250         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
251                        pthreadpool_child);
252 }
253
254 /*
255  * Return the file descriptor which becomes readable when a job has
256  * finished
257  */
258
259 int pthreadpool_signal_fd(struct pthreadpool *pool)
260 {
261         return pool->sig_pipe[0];
262 }
263
264 /*
265  * Do a pthread_join() on all children that have exited, pool->mutex must be
266  * locked
267  */
268 static void pthreadpool_join_children(struct pthreadpool *pool)
269 {
270         int i;
271
272         for (i=0; i<pool->num_exited; i++) {
273                 pthread_join(pool->exited[i], NULL);
274         }
275         pool->num_exited = 0;
276
277         /*
278          * Deliberately not free and NULL pool->exited. That will be
279          * re-used by realloc later.
280          */
281 }
282
283 /*
284  * Fetch a finished job number from the signal pipe
285  */
286
287 int pthreadpool_finished_job(struct pthreadpool *pool)
288 {
289         int result;
290         ssize_t nread;
291
292         nread = -1;
293         errno = EINTR;
294
295         while ((nread == -1) && (errno == EINTR)) {
296                 nread = read(pool->sig_pipe[0], &result, sizeof(int));
297         }
298         if (nread == -1) {
299                 return errno;
300         }
301         if (nread != sizeof(int)) {
302                 return EINVAL;
303         }
304         return result;
305 }
306
307 /*
308  * Destroy a thread pool, finishing all threads working for it
309  */
310
311 int pthreadpool_destroy(struct pthreadpool *pool)
312 {
313         int ret, ret1;
314
315         ret = pthread_mutex_lock(&pool->mutex);
316         if (ret != 0) {
317                 return ret;
318         }
319
320         if ((pool->jobs != NULL) || pool->shutdown) {
321                 ret = pthread_mutex_unlock(&pool->mutex);
322                 assert(ret == 0);
323                 return EBUSY;
324         }
325
326         if (pool->num_threads > 0) {
327                 /*
328                  * We have active threads, tell them to finish, wait for that.
329                  */
330
331                 pool->shutdown = 1;
332
333                 if (pool->num_idle > 0) {
334                         /*
335                          * Wake the idle threads. They will find pool->quit to
336                          * be set and exit themselves
337                          */
338                         ret = pthread_cond_broadcast(&pool->condvar);
339                         if (ret != 0) {
340                                 pthread_mutex_unlock(&pool->mutex);
341                                 return ret;
342                         }
343                 }
344
345                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
346
347                         if (pool->num_exited > 0) {
348                                 pthreadpool_join_children(pool);
349                                 continue;
350                         }
351                         /*
352                          * A thread that shuts down will also signal
353                          * pool->condvar
354                          */
355                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
356                         if (ret != 0) {
357                                 pthread_mutex_unlock(&pool->mutex);
358                                 return ret;
359                         }
360                 }
361         }
362
363         ret = pthread_mutex_unlock(&pool->mutex);
364         if (ret != 0) {
365                 return ret;
366         }
367         ret = pthread_mutex_destroy(&pool->mutex);
368         ret1 = pthread_cond_destroy(&pool->condvar);
369
370         if (ret != 0) {
371                 return ret;
372         }
373         if (ret1 != 0) {
374                 return ret1;
375         }
376
377         ret = pthread_mutex_lock(&pthreadpools_mutex);
378         if (ret != 0) {
379                 return ret;
380         }
381         DLIST_REMOVE(pthreadpools, pool);
382         ret = pthread_mutex_unlock(&pthreadpools_mutex);
383         assert(ret == 0);
384
385         close(pool->sig_pipe[0]);
386         pool->sig_pipe[0] = -1;
387
388         close(pool->sig_pipe[1]);
389         pool->sig_pipe[1] = -1;
390
391         free(pool->exited);
392         free(pool);
393
394         return 0;
395 }
396
397 /*
398  * Prepare for pthread_exit(), pool->mutex must be locked
399  */
400 static void pthreadpool_server_exit(struct pthreadpool *pool)
401 {
402         pthread_t *exited;
403
404         pool->num_threads -= 1;
405
406         exited = (pthread_t *)realloc(
407                 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
408
409         if (exited == NULL) {
410                 /* lost a thread status */
411                 return;
412         }
413         pool->exited = exited;
414
415         pool->exited[pool->num_exited] = pthread_self();
416         pool->num_exited += 1;
417 }
418
419 static void *pthreadpool_server(void *arg)
420 {
421         struct pthreadpool *pool = (struct pthreadpool *)arg;
422         int res;
423
424         res = pthread_mutex_lock(&pool->mutex);
425         if (res != 0) {
426                 return NULL;
427         }
428
429         while (1) {
430                 struct timespec ts;
431                 struct pthreadpool_job *job;
432
433                 /*
434                  * idle-wait at most 1 second. If nothing happens in that
435                  * time, exit this thread.
436                  */
437
438                 clock_gettime(CLOCK_REALTIME, &ts);
439                 ts.tv_sec += 1;
440
441                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
442
443                         pool->num_idle += 1;
444                         res = pthread_cond_timedwait(
445                                 &pool->condvar, &pool->mutex, &ts);
446                         pool->num_idle -= 1;
447
448                         if (res == ETIMEDOUT) {
449
450                                 if (pool->jobs == NULL) {
451                                         /*
452                                          * we timed out and still no work for
453                                          * us. Exit.
454                                          */
455                                         pthreadpool_server_exit(pool);
456                                         pthread_mutex_unlock(&pool->mutex);
457                                         return NULL;
458                                 }
459
460                                 break;
461                         }
462                         assert(res == 0);
463                 }
464
465                 job = pool->jobs;
466
467                 if (job != NULL) {
468                         ssize_t written;
469
470                         /*
471                          * Ok, there's work for us to do, remove the job from
472                          * the pthreadpool list
473                          */
474                         pool->jobs = job->next;
475                         if (pool->last_job == job) {
476                                 pool->last_job = NULL;
477                         }
478
479                         /*
480                          * Do the work with the mutex unlocked
481                          */
482
483                         res = pthread_mutex_unlock(&pool->mutex);
484                         assert(res == 0);
485
486                         job->fn(job->private_data);
487
488                         written = write(pool->sig_pipe[1], &job->id,
489                                         sizeof(int));
490
491                         free(job);
492
493                         res = pthread_mutex_lock(&pool->mutex);
494                         assert(res == 0);
495
496                         if (written != sizeof(int)) {
497                                 pthreadpool_server_exit(pool);
498                                 pthread_mutex_unlock(&pool->mutex);
499                                 return NULL;
500                         }
501                 }
502
503                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
504                         /*
505                          * No more work to do and we're asked to shut down, so
506                          * exit
507                          */
508                         pthreadpool_server_exit(pool);
509
510                         if (pool->num_threads == 0) {
511                                 /*
512                                  * Ping the main thread waiting for all of us
513                                  * workers to have quit.
514                                  */
515                                 pthread_cond_broadcast(&pool->condvar);
516                         }
517
518                         pthread_mutex_unlock(&pool->mutex);
519                         return NULL;
520                 }
521         }
522 }
523
524 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
525                         void (*fn)(void *private_data), void *private_data)
526 {
527         struct pthreadpool_job *job;
528         pthread_t thread_id;
529         int res;
530         sigset_t mask, omask;
531
532         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
533         if (job == NULL) {
534                 return ENOMEM;
535         }
536
537         job->fn = fn;
538         job->private_data = private_data;
539         job->id = job_id;
540         job->next = NULL;
541
542         res = pthread_mutex_lock(&pool->mutex);
543         if (res != 0) {
544                 free(job);
545                 return res;
546         }
547
548         if (pool->shutdown) {
549                 /*
550                  * Protect against the pool being shut down while
551                  * trying to add a job
552                  */
553                 res = pthread_mutex_unlock(&pool->mutex);
554                 assert(res == 0);
555                 free(job);
556                 return EINVAL;
557         }
558
559         /*
560          * Just some cleanup under the mutex
561          */
562         pthreadpool_join_children(pool);
563
564         /*
565          * Add job to the end of the queue
566          */
567         if (pool->jobs == NULL) {
568                 pool->jobs = job;
569         }
570         else {
571                 pool->last_job->next = job;
572         }
573         pool->last_job = job;
574
575         if (pool->num_idle > 0) {
576                 /*
577                  * We have idle threads, wake one.
578                  */
579                 res = pthread_cond_signal(&pool->condvar);
580                 pthread_mutex_unlock(&pool->mutex);
581                 return res;
582         }
583
584         if ((pool->max_threads != 0) &&
585             (pool->num_threads >= pool->max_threads)) {
586                 /*
587                  * No more new threads, we just queue the request
588                  */
589                 pthread_mutex_unlock(&pool->mutex);
590                 return 0;
591         }
592
593         /*
594          * Create a new worker thread. It should not receive any signals.
595          */
596
597         sigfillset(&mask);
598
599         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
600         if (res != 0) {
601                 pthread_mutex_unlock(&pool->mutex);
602                 return res;
603         }
604
605         res = pthread_create(&thread_id, NULL, pthreadpool_server,
606                                 (void *)pool);
607         if (res == 0) {
608                 pool->num_threads += 1;
609         }
610
611         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
612
613         pthread_mutex_unlock(&pool->mutex);
614         return res;
615 }