pthreadpool: use strict sync processing only with max_threads=0
[samba.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         bool shutdown;
77
78         /*
79          * maximum number of threads
80          * 0 means no real thread, only strict sync processing.
81          */
82         unsigned max_threads;
83
84         /*
85          * Number of threads
86          */
87         unsigned num_threads;
88
89         /*
90          * Number of idle threads
91          */
92         unsigned num_idle;
93
94         /*
95          * Condition variable indicating that helper threads should
96          * quickly go away making way for fork() without anybody
97          * waiting on pool->condvar.
98          */
99         pthread_cond_t *prefork_cond;
100
101         /*
102          * Waiting position for helper threads while fork is
103          * running. The forking thread will have locked it, and all
104          * idle helper threads will sit here until after the fork,
105          * where the forking thread will unlock it again.
106          */
107         pthread_mutex_t fork_mutex;
108 };
109
110 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
111 static struct pthreadpool *pthreadpools = NULL;
112 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
113
114 static void pthreadpool_prep_atfork(void);
115
116 /*
117  * Initialize a thread pool
118  */
119
120 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
121                      int (*signal_fn)(int jobid,
122                                       void (*job_fn)(void *private_data),
123                                       void *job_fn_private_data,
124                                       void *private_data),
125                      void *signal_fn_private_data)
126 {
127         struct pthreadpool *pool;
128         int ret;
129
130         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
131         if (pool == NULL) {
132                 return ENOMEM;
133         }
134         pool->signal_fn = signal_fn;
135         pool->signal_fn_private_data = signal_fn_private_data;
136
137         pool->jobs_array_len = 4;
138         pool->jobs = calloc(
139                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
140
141         if (pool->jobs == NULL) {
142                 free(pool);
143                 return ENOMEM;
144         }
145
146         pool->head = pool->num_jobs = 0;
147
148         ret = pthread_mutex_init(&pool->mutex, NULL);
149         if (ret != 0) {
150                 free(pool->jobs);
151                 free(pool);
152                 return ret;
153         }
154
155         ret = pthread_cond_init(&pool->condvar, NULL);
156         if (ret != 0) {
157                 pthread_mutex_destroy(&pool->mutex);
158                 free(pool->jobs);
159                 free(pool);
160                 return ret;
161         }
162
163         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
164         if (ret != 0) {
165                 pthread_cond_destroy(&pool->condvar);
166                 pthread_mutex_destroy(&pool->mutex);
167                 free(pool->jobs);
168                 free(pool);
169                 return ret;
170         }
171
172         pool->shutdown = false;
173         pool->num_threads = 0;
174         pool->max_threads = max_threads;
175         pool->num_idle = 0;
176         pool->prefork_cond = NULL;
177
178         ret = pthread_mutex_lock(&pthreadpools_mutex);
179         if (ret != 0) {
180                 pthread_mutex_destroy(&pool->fork_mutex);
181                 pthread_cond_destroy(&pool->condvar);
182                 pthread_mutex_destroy(&pool->mutex);
183                 free(pool->jobs);
184                 free(pool);
185                 return ret;
186         }
187         DLIST_ADD(pthreadpools, pool);
188
189         ret = pthread_mutex_unlock(&pthreadpools_mutex);
190         assert(ret == 0);
191
192         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
193
194         *presult = pool;
195
196         return 0;
197 }
198
199 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
200 {
201         int ret;
202
203         ret = pthread_mutex_lock(&pool->fork_mutex);
204         assert(ret == 0);
205
206         ret = pthread_mutex_lock(&pool->mutex);
207         assert(ret == 0);
208
209         while (pool->num_idle != 0) {
210                 unsigned num_idle = pool->num_idle;
211                 pthread_cond_t prefork_cond;
212
213                 ret = pthread_cond_init(&prefork_cond, NULL);
214                 assert(ret == 0);
215
216                 /*
217                  * Push all idle threads off pool->condvar. In the
218                  * child we can destroy the pool, which would result
219                  * in undefined behaviour in the
220                  * pthread_cond_destroy(pool->condvar). glibc just
221                  * blocks here.
222                  */
223                 pool->prefork_cond = &prefork_cond;
224
225                 ret = pthread_cond_signal(&pool->condvar);
226                 assert(ret == 0);
227
228                 while (pool->num_idle == num_idle) {
229                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
230                         assert(ret == 0);
231                 }
232
233                 pool->prefork_cond = NULL;
234
235                 ret = pthread_cond_destroy(&prefork_cond);
236                 assert(ret == 0);
237         }
238
239         /*
240          * Probably it's well-defined somewhere: What happens to
241          * condvars after a fork? The rationale of pthread_atfork only
242          * writes about mutexes. So better be safe than sorry and
243          * destroy/reinit pool->condvar across a fork.
244          */
245
246         ret = pthread_cond_destroy(&pool->condvar);
247         assert(ret == 0);
248 }
249
250 static void pthreadpool_prepare(void)
251 {
252         int ret;
253         struct pthreadpool *pool;
254
255         ret = pthread_mutex_lock(&pthreadpools_mutex);
256         assert(ret == 0);
257
258         pool = pthreadpools;
259
260         while (pool != NULL) {
261                 pthreadpool_prepare_pool(pool);
262                 pool = pool->next;
263         }
264 }
265
266 static void pthreadpool_parent(void)
267 {
268         int ret;
269         struct pthreadpool *pool;
270
271         for (pool = DLIST_TAIL(pthreadpools);
272              pool != NULL;
273              pool = DLIST_PREV(pool)) {
274                 ret = pthread_cond_init(&pool->condvar, NULL);
275                 assert(ret == 0);
276                 ret = pthread_mutex_unlock(&pool->mutex);
277                 assert(ret == 0);
278                 ret = pthread_mutex_unlock(&pool->fork_mutex);
279                 assert(ret == 0);
280         }
281
282         ret = pthread_mutex_unlock(&pthreadpools_mutex);
283         assert(ret == 0);
284 }
285
286 static void pthreadpool_child(void)
287 {
288         int ret;
289         struct pthreadpool *pool;
290
291         for (pool = DLIST_TAIL(pthreadpools);
292              pool != NULL;
293              pool = DLIST_PREV(pool)) {
294
295                 pool->num_threads = 0;
296                 pool->num_idle = 0;
297                 pool->head = 0;
298                 pool->num_jobs = 0;
299
300                 ret = pthread_cond_init(&pool->condvar, NULL);
301                 assert(ret == 0);
302
303                 ret = pthread_mutex_unlock(&pool->mutex);
304                 assert(ret == 0);
305
306                 ret = pthread_mutex_unlock(&pool->fork_mutex);
307                 assert(ret == 0);
308         }
309
310         ret = pthread_mutex_unlock(&pthreadpools_mutex);
311         assert(ret == 0);
312 }
313
314 static void pthreadpool_prep_atfork(void)
315 {
316         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
317                        pthreadpool_child);
318 }
319
320 static int pthreadpool_free(struct pthreadpool *pool)
321 {
322         int ret, ret1, ret2;
323
324         ret = pthread_mutex_lock(&pthreadpools_mutex);
325         if (ret != 0) {
326                 return ret;
327         }
328         DLIST_REMOVE(pthreadpools, pool);
329         ret = pthread_mutex_unlock(&pthreadpools_mutex);
330         assert(ret == 0);
331
332         ret = pthread_mutex_destroy(&pool->mutex);
333         ret1 = pthread_cond_destroy(&pool->condvar);
334         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
335
336         if (ret != 0) {
337                 return ret;
338         }
339         if (ret1 != 0) {
340                 return ret1;
341         }
342         if (ret2 != 0) {
343                 return ret2;
344         }
345
346         free(pool->jobs);
347         free(pool);
348
349         return 0;
350 }
351
352 /*
353  * Destroy a thread pool. Wake up all idle threads for exit. The last
354  * one will free the pool.
355  */
356
357 int pthreadpool_destroy(struct pthreadpool *pool)
358 {
359         int ret, ret1;
360
361         ret = pthread_mutex_lock(&pool->mutex);
362         if (ret != 0) {
363                 return ret;
364         }
365
366         if (pool->shutdown) {
367                 ret = pthread_mutex_unlock(&pool->mutex);
368                 assert(ret == 0);
369                 return EBUSY;
370         }
371
372         pool->shutdown = true;
373
374         if (pool->num_threads == 0) {
375                 ret = pthread_mutex_unlock(&pool->mutex);
376                 assert(ret == 0);
377
378                 ret = pthreadpool_free(pool);
379                 return ret;
380         }
381
382         /*
383          * We have active threads, tell them to finish.
384          */
385
386         ret = pthread_cond_broadcast(&pool->condvar);
387
388         ret1 = pthread_mutex_unlock(&pool->mutex);
389         assert(ret1 == 0);
390
391         return ret;
392 }
393
394 /*
395  * Prepare for pthread_exit(), pool->mutex must be locked and will be
396  * unlocked here. This is a bit of a layering violation, but here we
397  * also take care of removing the pool if we're the last thread.
398  */
399 static void pthreadpool_server_exit(struct pthreadpool *pool)
400 {
401         int ret;
402         bool free_it;
403
404         pool->num_threads -= 1;
405
406         free_it = (pool->shutdown && (pool->num_threads == 0));
407
408         ret = pthread_mutex_unlock(&pool->mutex);
409         assert(ret == 0);
410
411         if (free_it) {
412                 pthreadpool_free(pool);
413         }
414 }
415
416 static bool pthreadpool_get_job(struct pthreadpool *p,
417                                 struct pthreadpool_job *job)
418 {
419         if (p->num_jobs == 0) {
420                 return false;
421         }
422         *job = p->jobs[p->head];
423         p->head = (p->head+1) % p->jobs_array_len;
424         p->num_jobs -= 1;
425         return true;
426 }
427
428 static bool pthreadpool_put_job(struct pthreadpool *p,
429                                 int id,
430                                 void (*fn)(void *private_data),
431                                 void *private_data)
432 {
433         struct pthreadpool_job *job;
434
435         if (p->num_jobs == p->jobs_array_len) {
436                 struct pthreadpool_job *tmp;
437                 size_t new_len = p->jobs_array_len * 2;
438
439                 tmp = realloc(
440                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
441                 if (tmp == NULL) {
442                         return false;
443                 }
444                 p->jobs = tmp;
445
446                 /*
447                  * We just doubled the jobs array. The array implements a FIFO
448                  * queue with a modulo-based wraparound, so we have to memcpy
449                  * the jobs that are logically at the queue end but physically
450                  * before the queue head into the reallocated area. The new
451                  * space starts at the current jobs_array_len, and we have to
452                  * copy everything before the current head job into the new
453                  * area.
454                  */
455                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
456                        sizeof(struct pthreadpool_job) * p->head);
457
458                 p->jobs_array_len = new_len;
459         }
460
461         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
462         job->id = id;
463         job->fn = fn;
464         job->private_data = private_data;
465
466         p->num_jobs += 1;
467
468         return true;
469 }
470
471 static void pthreadpool_undo_put_job(struct pthreadpool *p)
472 {
473         p->num_jobs -= 1;
474 }
475
476 static void *pthreadpool_server(void *arg)
477 {
478         struct pthreadpool *pool = (struct pthreadpool *)arg;
479         int res;
480
481         res = pthread_mutex_lock(&pool->mutex);
482         if (res != 0) {
483                 return NULL;
484         }
485
486         while (1) {
487                 struct timespec ts;
488                 struct pthreadpool_job job;
489
490                 /*
491                  * idle-wait at most 1 second. If nothing happens in that
492                  * time, exit this thread.
493                  */
494
495                 clock_gettime(CLOCK_REALTIME, &ts);
496                 ts.tv_sec += 1;
497
498                 while ((pool->num_jobs == 0) && !pool->shutdown) {
499
500                         pool->num_idle += 1;
501                         res = pthread_cond_timedwait(
502                                 &pool->condvar, &pool->mutex, &ts);
503                         pool->num_idle -= 1;
504
505                         if (pool->prefork_cond != NULL) {
506                                 /*
507                                  * Me must allow fork() to continue
508                                  * without anybody waiting on
509                                  * &pool->condvar. Tell
510                                  * pthreadpool_prepare_pool that we
511                                  * got that message.
512                                  */
513
514                                 res = pthread_cond_signal(pool->prefork_cond);
515                                 assert(res == 0);
516
517                                 res = pthread_mutex_unlock(&pool->mutex);
518                                 assert(res == 0);
519
520                                 /*
521                                  * pthreadpool_prepare_pool has
522                                  * already locked this mutex across
523                                  * the fork. This makes us wait
524                                  * without sitting in a condvar.
525                                  */
526                                 res = pthread_mutex_lock(&pool->fork_mutex);
527                                 assert(res == 0);
528                                 res = pthread_mutex_unlock(&pool->fork_mutex);
529                                 assert(res == 0);
530
531                                 res = pthread_mutex_lock(&pool->mutex);
532                                 assert(res == 0);
533                         }
534
535                         if (res == ETIMEDOUT) {
536
537                                 if (pool->num_jobs == 0) {
538                                         /*
539                                          * we timed out and still no work for
540                                          * us. Exit.
541                                          */
542                                         pthreadpool_server_exit(pool);
543                                         return NULL;
544                                 }
545
546                                 break;
547                         }
548                         assert(res == 0);
549                 }
550
551                 if (pthreadpool_get_job(pool, &job)) {
552                         int ret;
553
554                         /*
555                          * Do the work with the mutex unlocked
556                          */
557
558                         res = pthread_mutex_unlock(&pool->mutex);
559                         assert(res == 0);
560
561                         job.fn(job.private_data);
562
563                         ret = pool->signal_fn(job.id,
564                                               job.fn, job.private_data,
565                                               pool->signal_fn_private_data);
566
567                         res = pthread_mutex_lock(&pool->mutex);
568                         assert(res == 0);
569
570                         if (ret != 0) {
571                                 pthreadpool_server_exit(pool);
572                                 return NULL;
573                         }
574                 }
575
576                 if ((pool->num_jobs == 0) && pool->shutdown) {
577                         /*
578                          * No more work to do and we're asked to shut down, so
579                          * exit
580                          */
581                         pthreadpool_server_exit(pool);
582                         return NULL;
583                 }
584         }
585 }
586
587 static int pthreadpool_create_thread(struct pthreadpool *pool)
588 {
589         pthread_attr_t thread_attr;
590         pthread_t thread_id;
591         int res;
592         sigset_t mask, omask;
593
594         /*
595          * Create a new worker thread. It should not receive any signals.
596          */
597
598         sigfillset(&mask);
599
600         res = pthread_attr_init(&thread_attr);
601         if (res != 0) {
602                 return res;
603         }
604
605         res = pthread_attr_setdetachstate(
606                 &thread_attr, PTHREAD_CREATE_DETACHED);
607         if (res != 0) {
608                 pthread_attr_destroy(&thread_attr);
609                 return res;
610         }
611
612         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
613         if (res != 0) {
614                 pthread_attr_destroy(&thread_attr);
615                 return res;
616         }
617
618         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
619                              (void *)pool);
620
621         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
622
623         pthread_attr_destroy(&thread_attr);
624
625         if (res == 0) {
626                 pool->num_threads += 1;
627         }
628
629         return res;
630 }
631
632 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
633                         void (*fn)(void *private_data), void *private_data)
634 {
635         int res;
636         int unlock_res;
637
638         res = pthread_mutex_lock(&pool->mutex);
639         if (res != 0) {
640                 return res;
641         }
642
643         if (pool->shutdown) {
644                 /*
645                  * Protect against the pool being shut down while
646                  * trying to add a job
647                  */
648                 unlock_res = pthread_mutex_unlock(&pool->mutex);
649                 assert(unlock_res == 0);
650                 return EINVAL;
651         }
652
653         if (pool->max_threads == 0) {
654                 unlock_res = pthread_mutex_unlock(&pool->mutex);
655                 assert(unlock_res == 0);
656
657                 /*
658                  * If no thread are allowed we do strict sync processing.
659                  */
660                 fn(private_data);
661                 res = pool->signal_fn(job_id, fn, private_data,
662                                       pool->signal_fn_private_data);
663                 return res;
664         }
665
666         /*
667          * Add job to the end of the queue
668          */
669         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
670                 unlock_res = pthread_mutex_unlock(&pool->mutex);
671                 assert(unlock_res == 0);
672                 return ENOMEM;
673         }
674
675         if (pool->num_idle > 0) {
676                 /*
677                  * We have idle threads, wake one.
678                  */
679                 res = pthread_cond_signal(&pool->condvar);
680                 if (res != 0) {
681                         pthreadpool_undo_put_job(pool);
682                 }
683                 unlock_res = pthread_mutex_unlock(&pool->mutex);
684                 assert(unlock_res == 0);
685                 return res;
686         }
687
688         if (pool->num_threads >= pool->max_threads) {
689                 /*
690                  * No more new threads, we just queue the request
691                  */
692                 unlock_res = pthread_mutex_unlock(&pool->mutex);
693                 assert(unlock_res == 0);
694                 return 0;
695         }
696
697         res = pthreadpool_create_thread(pool);
698         if (res == 0) {
699                 unlock_res = pthread_mutex_unlock(&pool->mutex);
700                 assert(unlock_res == 0);
701                 return 0;
702         }
703
704         if (pool->num_threads != 0) {
705                 /*
706                  * At least one thread is still available, let
707                  * that one run the queued job.
708                  */
709                 unlock_res = pthread_mutex_unlock(&pool->mutex);
710                 assert(unlock_res == 0);
711                 return 0;
712         }
713
714         /*
715          * No thread could be created to run job, fallback to sync
716          * call.
717          */
718         pthreadpool_undo_put_job(pool);
719
720         unlock_res = pthread_mutex_unlock(&pool->mutex);
721         assert(unlock_res == 0);
722
723         return res;
724 }