23885aa6d116e2690674455e626e92fc0c0411f1
[samba.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         bool shutdown;
77
78         /*
79          * maximum number of threads
80          */
81         int max_threads;
82
83         /*
84          * Number of threads
85          */
86         int num_threads;
87
88         /*
89          * Number of idle threads
90          */
91         int num_idle;
92
93         /*
94          * Condition variable indicating that we should quickly go
95          * away making way for fork() without anybody waiting on
96          * pool->condvar.
97          */
98         pthread_cond_t *prefork_cond;
99 };
100
101 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
102 static struct pthreadpool *pthreadpools = NULL;
103 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
104
105 static void pthreadpool_prep_atfork(void);
106
107 /*
108  * Initialize a thread pool
109  */
110
111 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
112                      int (*signal_fn)(int jobid,
113                                       void (*job_fn)(void *private_data),
114                                       void *job_fn_private_data,
115                                       void *private_data),
116                      void *signal_fn_private_data)
117 {
118         struct pthreadpool *pool;
119         int ret;
120
121         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
122         if (pool == NULL) {
123                 return ENOMEM;
124         }
125         pool->signal_fn = signal_fn;
126         pool->signal_fn_private_data = signal_fn_private_data;
127
128         pool->jobs_array_len = 4;
129         pool->jobs = calloc(
130                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
131
132         if (pool->jobs == NULL) {
133                 free(pool);
134                 return ENOMEM;
135         }
136
137         pool->head = pool->num_jobs = 0;
138
139         ret = pthread_mutex_init(&pool->mutex, NULL);
140         if (ret != 0) {
141                 free(pool->jobs);
142                 free(pool);
143                 return ret;
144         }
145
146         ret = pthread_cond_init(&pool->condvar, NULL);
147         if (ret != 0) {
148                 pthread_mutex_destroy(&pool->mutex);
149                 free(pool->jobs);
150                 free(pool);
151                 return ret;
152         }
153
154         pool->shutdown = false;
155         pool->num_threads = 0;
156         pool->max_threads = max_threads;
157         pool->num_idle = 0;
158         pool->prefork_cond = NULL;
159
160         ret = pthread_mutex_lock(&pthreadpools_mutex);
161         if (ret != 0) {
162                 pthread_cond_destroy(&pool->condvar);
163                 pthread_mutex_destroy(&pool->mutex);
164                 free(pool->jobs);
165                 free(pool);
166                 return ret;
167         }
168         DLIST_ADD(pthreadpools, pool);
169
170         ret = pthread_mutex_unlock(&pthreadpools_mutex);
171         assert(ret == 0);
172
173         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
174
175         *presult = pool;
176
177         return 0;
178 }
179
180 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
181 {
182         pthread_cond_t prefork_cond = PTHREAD_COND_INITIALIZER;
183         int ret;
184
185         ret = pthread_mutex_lock(&pool->mutex);
186         assert(ret == 0);
187
188         while (pool->num_idle != 0) {
189                 /*
190                  * Exit all idle threads, which are all blocked in
191                  * pool->condvar. In the child we can destroy the
192                  * pool, which would result in undefined behaviour in
193                  * the pthread_cond_destroy(pool->condvar). glibc just
194                  * blocks here.
195                  */
196                 pool->prefork_cond = &prefork_cond;
197
198                 ret = pthread_cond_signal(&pool->condvar);
199                 assert(ret == 0);
200
201                 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
202                 assert(ret == 0);
203
204                 pool->prefork_cond = NULL;
205         }
206
207         ret = pthread_cond_destroy(&prefork_cond);
208         assert(ret == 0);
209
210         /*
211          * Probably it's well-defined somewhere: What happens to
212          * condvars after a fork? The rationale of pthread_atfork only
213          * writes about mutexes. So better be safe than sorry and
214          * destroy/reinit pool->condvar across a fork.
215          */
216
217         ret = pthread_cond_destroy(&pool->condvar);
218         assert(ret == 0);
219 }
220
221 static void pthreadpool_prepare(void)
222 {
223         int ret;
224         struct pthreadpool *pool;
225
226         ret = pthread_mutex_lock(&pthreadpools_mutex);
227         assert(ret == 0);
228
229         pool = pthreadpools;
230
231         while (pool != NULL) {
232                 pthreadpool_prepare_pool(pool);
233                 pool = pool->next;
234         }
235 }
236
237 static void pthreadpool_parent(void)
238 {
239         int ret;
240         struct pthreadpool *pool;
241
242         for (pool = DLIST_TAIL(pthreadpools);
243              pool != NULL;
244              pool = DLIST_PREV(pool)) {
245                 ret = pthread_cond_init(&pool->condvar, NULL);
246                 assert(ret == 0);
247                 ret = pthread_mutex_unlock(&pool->mutex);
248                 assert(ret == 0);
249         }
250
251         ret = pthread_mutex_unlock(&pthreadpools_mutex);
252         assert(ret == 0);
253 }
254
255 static void pthreadpool_child(void)
256 {
257         int ret;
258         struct pthreadpool *pool;
259
260         for (pool = DLIST_TAIL(pthreadpools);
261              pool != NULL;
262              pool = DLIST_PREV(pool)) {
263
264                 pool->num_threads = 0;
265                 pool->num_idle = 0;
266                 pool->head = 0;
267                 pool->num_jobs = 0;
268
269                 ret = pthread_cond_init(&pool->condvar, NULL);
270                 assert(ret == 0);
271                 ret = pthread_mutex_unlock(&pool->mutex);
272                 assert(ret == 0);
273         }
274
275         ret = pthread_mutex_unlock(&pthreadpools_mutex);
276         assert(ret == 0);
277 }
278
279 static void pthreadpool_prep_atfork(void)
280 {
281         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
282                        pthreadpool_child);
283 }
284
285 static int pthreadpool_free(struct pthreadpool *pool)
286 {
287         int ret, ret1;
288
289         ret = pthread_mutex_lock(&pthreadpools_mutex);
290         if (ret != 0) {
291                 return ret;
292         }
293         DLIST_REMOVE(pthreadpools, pool);
294         ret = pthread_mutex_unlock(&pthreadpools_mutex);
295         assert(ret == 0);
296
297         ret = pthread_mutex_destroy(&pool->mutex);
298         ret1 = pthread_cond_destroy(&pool->condvar);
299
300         if (ret != 0) {
301                 return ret;
302         }
303         if (ret1 != 0) {
304                 return ret1;
305         }
306
307         free(pool->jobs);
308         free(pool);
309
310         return 0;
311 }
312
313 /*
314  * Destroy a thread pool. Wake up all idle threads for exit. The last
315  * one will free the pool.
316  */
317
318 int pthreadpool_destroy(struct pthreadpool *pool)
319 {
320         int ret, ret1;
321
322         ret = pthread_mutex_lock(&pool->mutex);
323         if (ret != 0) {
324                 return ret;
325         }
326
327         if (pool->shutdown) {
328                 ret = pthread_mutex_unlock(&pool->mutex);
329                 assert(ret == 0);
330                 return EBUSY;
331         }
332
333         pool->shutdown = true;
334
335         if (pool->num_threads == 0) {
336                 ret = pthread_mutex_unlock(&pool->mutex);
337                 assert(ret == 0);
338
339                 ret = pthreadpool_free(pool);
340                 return ret;
341         }
342
343         /*
344          * We have active threads, tell them to finish.
345          */
346
347         ret = pthread_cond_broadcast(&pool->condvar);
348
349         ret1 = pthread_mutex_unlock(&pool->mutex);
350         assert(ret1 == 0);
351
352         return ret;
353 }
354
355 /*
356  * Prepare for pthread_exit(), pool->mutex must be locked and will be
357  * unlocked here. This is a bit of a layering violation, but here we
358  * also take care of removing the pool if we're the last thread.
359  */
360 static void pthreadpool_server_exit(struct pthreadpool *pool)
361 {
362         int ret;
363         bool free_it;
364
365         pool->num_threads -= 1;
366
367         free_it = (pool->shutdown && (pool->num_threads == 0));
368
369         ret = pthread_mutex_unlock(&pool->mutex);
370         assert(ret == 0);
371
372         if (free_it) {
373                 pthreadpool_free(pool);
374         }
375 }
376
377 static bool pthreadpool_get_job(struct pthreadpool *p,
378                                 struct pthreadpool_job *job)
379 {
380         if (p->num_jobs == 0) {
381                 return false;
382         }
383         *job = p->jobs[p->head];
384         p->head = (p->head+1) % p->jobs_array_len;
385         p->num_jobs -= 1;
386         return true;
387 }
388
389 static bool pthreadpool_put_job(struct pthreadpool *p,
390                                 int id,
391                                 void (*fn)(void *private_data),
392                                 void *private_data)
393 {
394         struct pthreadpool_job *job;
395
396         if (p->num_jobs == p->jobs_array_len) {
397                 struct pthreadpool_job *tmp;
398                 size_t new_len = p->jobs_array_len * 2;
399
400                 tmp = realloc(
401                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
402                 if (tmp == NULL) {
403                         return false;
404                 }
405                 p->jobs = tmp;
406
407                 /*
408                  * We just doubled the jobs array. The array implements a FIFO
409                  * queue with a modulo-based wraparound, so we have to memcpy
410                  * the jobs that are logically at the queue end but physically
411                  * before the queue head into the reallocated area. The new
412                  * space starts at the current jobs_array_len, and we have to
413                  * copy everything before the current head job into the new
414                  * area.
415                  */
416                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
417                        sizeof(struct pthreadpool_job) * p->head);
418
419                 p->jobs_array_len = new_len;
420         }
421
422         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
423         job->id = id;
424         job->fn = fn;
425         job->private_data = private_data;
426
427         p->num_jobs += 1;
428
429         return true;
430 }
431
432 static void *pthreadpool_server(void *arg)
433 {
434         struct pthreadpool *pool = (struct pthreadpool *)arg;
435         int res;
436
437         res = pthread_mutex_lock(&pool->mutex);
438         if (res != 0) {
439                 return NULL;
440         }
441
442         while (1) {
443                 struct timespec ts;
444                 struct pthreadpool_job job;
445
446                 /*
447                  * idle-wait at most 1 second. If nothing happens in that
448                  * time, exit this thread.
449                  */
450
451                 clock_gettime(CLOCK_REALTIME, &ts);
452                 ts.tv_sec += 1;
453
454                 while ((pool->num_jobs == 0) && !pool->shutdown) {
455
456                         pool->num_idle += 1;
457                         res = pthread_cond_timedwait(
458                                 &pool->condvar, &pool->mutex, &ts);
459                         pool->num_idle -= 1;
460
461                         if (pool->prefork_cond != NULL) {
462                                 /*
463                                  * Me must allow fork() to continue
464                                  * without anybody waiting on
465                                  * &pool->condvar.
466                                  */
467                                 pthread_cond_signal(pool->prefork_cond);
468                                 pthreadpool_server_exit(pool);
469                                 return NULL;
470                         }
471
472                         if (res == ETIMEDOUT) {
473
474                                 if (pool->num_jobs == 0) {
475                                         /*
476                                          * we timed out and still no work for
477                                          * us. Exit.
478                                          */
479                                         pthreadpool_server_exit(pool);
480                                         return NULL;
481                                 }
482
483                                 break;
484                         }
485                         assert(res == 0);
486                 }
487
488                 if (pthreadpool_get_job(pool, &job)) {
489                         int ret;
490
491                         /*
492                          * Do the work with the mutex unlocked
493                          */
494
495                         res = pthread_mutex_unlock(&pool->mutex);
496                         assert(res == 0);
497
498                         job.fn(job.private_data);
499
500                         ret = pool->signal_fn(job.id,
501                                               job.fn, job.private_data,
502                                               pool->signal_fn_private_data);
503
504                         res = pthread_mutex_lock(&pool->mutex);
505                         assert(res == 0);
506
507                         if (ret != 0) {
508                                 pthreadpool_server_exit(pool);
509                                 return NULL;
510                         }
511                 }
512
513                 if ((pool->num_jobs == 0) && pool->shutdown) {
514                         /*
515                          * No more work to do and we're asked to shut down, so
516                          * exit
517                          */
518                         pthreadpool_server_exit(pool);
519                         return NULL;
520                 }
521         }
522 }
523
524 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
525                         void (*fn)(void *private_data), void *private_data)
526 {
527         pthread_attr_t thread_attr;
528         pthread_t thread_id;
529         int res;
530         sigset_t mask, omask;
531
532         res = pthread_mutex_lock(&pool->mutex);
533         if (res != 0) {
534                 return res;
535         }
536
537         if (pool->shutdown) {
538                 /*
539                  * Protect against the pool being shut down while
540                  * trying to add a job
541                  */
542                 res = pthread_mutex_unlock(&pool->mutex);
543                 assert(res == 0);
544                 return EINVAL;
545         }
546
547         /*
548          * Add job to the end of the queue
549          */
550         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
551                 pthread_mutex_unlock(&pool->mutex);
552                 return ENOMEM;
553         }
554
555         if (pool->num_idle > 0) {
556                 /*
557                  * We have idle threads, wake one.
558                  */
559                 res = pthread_cond_signal(&pool->condvar);
560                 pthread_mutex_unlock(&pool->mutex);
561                 return res;
562         }
563
564         if ((pool->max_threads != 0) &&
565             (pool->num_threads >= pool->max_threads)) {
566                 /*
567                  * No more new threads, we just queue the request
568                  */
569                 pthread_mutex_unlock(&pool->mutex);
570                 return 0;
571         }
572
573         /*
574          * Create a new worker thread. It should not receive any signals.
575          */
576
577         sigfillset(&mask);
578
579         res = pthread_attr_init(&thread_attr);
580         if (res != 0) {
581                 pthread_mutex_unlock(&pool->mutex);
582                 return res;
583         }
584
585         res = pthread_attr_setdetachstate(
586                 &thread_attr, PTHREAD_CREATE_DETACHED);
587         if (res != 0) {
588                 pthread_attr_destroy(&thread_attr);
589                 pthread_mutex_unlock(&pool->mutex);
590                 return res;
591         }
592
593         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
594         if (res != 0) {
595                 pthread_attr_destroy(&thread_attr);
596                 pthread_mutex_unlock(&pool->mutex);
597                 return res;
598         }
599
600         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
601                              (void *)pool);
602         if (res == 0) {
603                 pool->num_threads += 1;
604         }
605
606         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
607
608         pthread_attr_destroy(&thread_attr);
609
610         pthread_mutex_unlock(&pool->mutex);
611         return res;
612 }