pthreadpool: Add some asserts
[nivanova/samba-autobuild/.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         bool shutdown;
77
78         /*
79          * maximum number of threads
80          */
81         int max_threads;
82
83         /*
84          * Number of threads
85          */
86         int num_threads;
87
88         /*
89          * Number of idle threads
90          */
91         int num_idle;
92
93         /*
94          * Condition variable indicating that helper threads should
95          * quickly go away making way for fork() without anybody
96          * waiting on pool->condvar.
97          */
98         pthread_cond_t *prefork_cond;
99
100         /*
101          * Waiting position for helper threads while fork is
102          * running. The forking thread will have locked it, and all
103          * idle helper threads will sit here until after the fork,
104          * where the forking thread will unlock it again.
105          */
106         pthread_mutex_t fork_mutex;
107 };
108
109 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
110 static struct pthreadpool *pthreadpools = NULL;
111 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
112
113 static void pthreadpool_prep_atfork(void);
114
115 /*
116  * Initialize a thread pool
117  */
118
119 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
120                      int (*signal_fn)(int jobid,
121                                       void (*job_fn)(void *private_data),
122                                       void *job_fn_private_data,
123                                       void *private_data),
124                      void *signal_fn_private_data)
125 {
126         struct pthreadpool *pool;
127         int ret;
128
129         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
130         if (pool == NULL) {
131                 return ENOMEM;
132         }
133         pool->signal_fn = signal_fn;
134         pool->signal_fn_private_data = signal_fn_private_data;
135
136         pool->jobs_array_len = 4;
137         pool->jobs = calloc(
138                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
139
140         if (pool->jobs == NULL) {
141                 free(pool);
142                 return ENOMEM;
143         }
144
145         pool->head = pool->num_jobs = 0;
146
147         ret = pthread_mutex_init(&pool->mutex, NULL);
148         if (ret != 0) {
149                 free(pool->jobs);
150                 free(pool);
151                 return ret;
152         }
153
154         ret = pthread_cond_init(&pool->condvar, NULL);
155         if (ret != 0) {
156                 pthread_mutex_destroy(&pool->mutex);
157                 free(pool->jobs);
158                 free(pool);
159                 return ret;
160         }
161
162         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
163         if (ret != 0) {
164                 pthread_cond_destroy(&pool->condvar);
165                 pthread_mutex_destroy(&pool->mutex);
166                 free(pool->jobs);
167                 free(pool);
168                 return ret;
169         }
170
171         pool->shutdown = false;
172         pool->num_threads = 0;
173         pool->max_threads = max_threads;
174         pool->num_idle = 0;
175         pool->prefork_cond = NULL;
176
177         ret = pthread_mutex_lock(&pthreadpools_mutex);
178         if (ret != 0) {
179                 pthread_mutex_destroy(&pool->fork_mutex);
180                 pthread_cond_destroy(&pool->condvar);
181                 pthread_mutex_destroy(&pool->mutex);
182                 free(pool->jobs);
183                 free(pool);
184                 return ret;
185         }
186         DLIST_ADD(pthreadpools, pool);
187
188         ret = pthread_mutex_unlock(&pthreadpools_mutex);
189         assert(ret == 0);
190
191         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
192
193         *presult = pool;
194
195         return 0;
196 }
197
198 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
199 {
200         int ret;
201
202         ret = pthread_mutex_lock(&pool->fork_mutex);
203         assert(ret == 0);
204
205         ret = pthread_mutex_lock(&pool->mutex);
206         assert(ret == 0);
207
208         while (pool->num_idle != 0) {
209                 int num_idle = pool->num_idle;
210                 pthread_cond_t prefork_cond;
211
212                 ret = pthread_cond_init(&prefork_cond, NULL);
213                 assert(ret == 0);
214
215                 /*
216                  * Push all idle threads off pool->condvar. In the
217                  * child we can destroy the pool, which would result
218                  * in undefined behaviour in the
219                  * pthread_cond_destroy(pool->condvar). glibc just
220                  * blocks here.
221                  */
222                 pool->prefork_cond = &prefork_cond;
223
224                 ret = pthread_cond_signal(&pool->condvar);
225                 assert(ret == 0);
226
227                 while (pool->num_idle == num_idle) {
228                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
229                         assert(ret == 0);
230                 }
231
232                 pool->prefork_cond = NULL;
233
234                 ret = pthread_cond_destroy(&prefork_cond);
235                 assert(ret == 0);
236         }
237
238         /*
239          * Probably it's well-defined somewhere: What happens to
240          * condvars after a fork? The rationale of pthread_atfork only
241          * writes about mutexes. So better be safe than sorry and
242          * destroy/reinit pool->condvar across a fork.
243          */
244
245         ret = pthread_cond_destroy(&pool->condvar);
246         assert(ret == 0);
247 }
248
249 static void pthreadpool_prepare(void)
250 {
251         int ret;
252         struct pthreadpool *pool;
253
254         ret = pthread_mutex_lock(&pthreadpools_mutex);
255         assert(ret == 0);
256
257         pool = pthreadpools;
258
259         while (pool != NULL) {
260                 pthreadpool_prepare_pool(pool);
261                 pool = pool->next;
262         }
263 }
264
265 static void pthreadpool_parent(void)
266 {
267         int ret;
268         struct pthreadpool *pool;
269
270         for (pool = DLIST_TAIL(pthreadpools);
271              pool != NULL;
272              pool = DLIST_PREV(pool)) {
273                 ret = pthread_cond_init(&pool->condvar, NULL);
274                 assert(ret == 0);
275                 ret = pthread_mutex_unlock(&pool->mutex);
276                 assert(ret == 0);
277                 ret = pthread_mutex_unlock(&pool->fork_mutex);
278                 assert(ret == 0);
279         }
280
281         ret = pthread_mutex_unlock(&pthreadpools_mutex);
282         assert(ret == 0);
283 }
284
285 static void pthreadpool_child(void)
286 {
287         int ret;
288         struct pthreadpool *pool;
289
290         for (pool = DLIST_TAIL(pthreadpools);
291              pool != NULL;
292              pool = DLIST_PREV(pool)) {
293
294                 pool->num_threads = 0;
295                 pool->num_idle = 0;
296                 pool->head = 0;
297                 pool->num_jobs = 0;
298
299                 ret = pthread_cond_init(&pool->condvar, NULL);
300                 assert(ret == 0);
301
302                 ret = pthread_mutex_unlock(&pool->mutex);
303                 assert(ret == 0);
304
305                 ret = pthread_mutex_unlock(&pool->fork_mutex);
306                 assert(ret == 0);
307         }
308
309         ret = pthread_mutex_unlock(&pthreadpools_mutex);
310         assert(ret == 0);
311 }
312
313 static void pthreadpool_prep_atfork(void)
314 {
315         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
316                        pthreadpool_child);
317 }
318
319 static int pthreadpool_free(struct pthreadpool *pool)
320 {
321         int ret, ret1, ret2;
322
323         ret = pthread_mutex_lock(&pthreadpools_mutex);
324         if (ret != 0) {
325                 return ret;
326         }
327         DLIST_REMOVE(pthreadpools, pool);
328         ret = pthread_mutex_unlock(&pthreadpools_mutex);
329         assert(ret == 0);
330
331         ret = pthread_mutex_destroy(&pool->mutex);
332         ret1 = pthread_cond_destroy(&pool->condvar);
333         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
334
335         if (ret != 0) {
336                 return ret;
337         }
338         if (ret1 != 0) {
339                 return ret1;
340         }
341         if (ret2 != 0) {
342                 return ret2;
343         }
344
345         free(pool->jobs);
346         free(pool);
347
348         return 0;
349 }
350
351 /*
352  * Destroy a thread pool. Wake up all idle threads for exit. The last
353  * one will free the pool.
354  */
355
356 int pthreadpool_destroy(struct pthreadpool *pool)
357 {
358         int ret, ret1;
359
360         ret = pthread_mutex_lock(&pool->mutex);
361         if (ret != 0) {
362                 return ret;
363         }
364
365         if (pool->shutdown) {
366                 ret = pthread_mutex_unlock(&pool->mutex);
367                 assert(ret == 0);
368                 return EBUSY;
369         }
370
371         pool->shutdown = true;
372
373         if (pool->num_threads == 0) {
374                 ret = pthread_mutex_unlock(&pool->mutex);
375                 assert(ret == 0);
376
377                 ret = pthreadpool_free(pool);
378                 return ret;
379         }
380
381         /*
382          * We have active threads, tell them to finish.
383          */
384
385         ret = pthread_cond_broadcast(&pool->condvar);
386
387         ret1 = pthread_mutex_unlock(&pool->mutex);
388         assert(ret1 == 0);
389
390         return ret;
391 }
392
393 /*
394  * Prepare for pthread_exit(), pool->mutex must be locked and will be
395  * unlocked here. This is a bit of a layering violation, but here we
396  * also take care of removing the pool if we're the last thread.
397  */
398 static void pthreadpool_server_exit(struct pthreadpool *pool)
399 {
400         int ret;
401         bool free_it;
402
403         pool->num_threads -= 1;
404
405         free_it = (pool->shutdown && (pool->num_threads == 0));
406
407         ret = pthread_mutex_unlock(&pool->mutex);
408         assert(ret == 0);
409
410         if (free_it) {
411                 pthreadpool_free(pool);
412         }
413 }
414
415 static bool pthreadpool_get_job(struct pthreadpool *p,
416                                 struct pthreadpool_job *job)
417 {
418         if (p->num_jobs == 0) {
419                 return false;
420         }
421         *job = p->jobs[p->head];
422         p->head = (p->head+1) % p->jobs_array_len;
423         p->num_jobs -= 1;
424         return true;
425 }
426
427 static bool pthreadpool_put_job(struct pthreadpool *p,
428                                 int id,
429                                 void (*fn)(void *private_data),
430                                 void *private_data)
431 {
432         struct pthreadpool_job *job;
433
434         if (p->num_jobs == p->jobs_array_len) {
435                 struct pthreadpool_job *tmp;
436                 size_t new_len = p->jobs_array_len * 2;
437
438                 tmp = realloc(
439                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
440                 if (tmp == NULL) {
441                         return false;
442                 }
443                 p->jobs = tmp;
444
445                 /*
446                  * We just doubled the jobs array. The array implements a FIFO
447                  * queue with a modulo-based wraparound, so we have to memcpy
448                  * the jobs that are logically at the queue end but physically
449                  * before the queue head into the reallocated area. The new
450                  * space starts at the current jobs_array_len, and we have to
451                  * copy everything before the current head job into the new
452                  * area.
453                  */
454                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
455                        sizeof(struct pthreadpool_job) * p->head);
456
457                 p->jobs_array_len = new_len;
458         }
459
460         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
461         job->id = id;
462         job->fn = fn;
463         job->private_data = private_data;
464
465         p->num_jobs += 1;
466
467         return true;
468 }
469
470 static void pthreadpool_undo_put_job(struct pthreadpool *p)
471 {
472         p->num_jobs -= 1;
473 }
474
475 static void *pthreadpool_server(void *arg)
476 {
477         struct pthreadpool *pool = (struct pthreadpool *)arg;
478         int res;
479
480         res = pthread_mutex_lock(&pool->mutex);
481         if (res != 0) {
482                 return NULL;
483         }
484
485         while (1) {
486                 struct timespec ts;
487                 struct pthreadpool_job job;
488
489                 /*
490                  * idle-wait at most 1 second. If nothing happens in that
491                  * time, exit this thread.
492                  */
493
494                 clock_gettime(CLOCK_REALTIME, &ts);
495                 ts.tv_sec += 1;
496
497                 while ((pool->num_jobs == 0) && !pool->shutdown) {
498
499                         pool->num_idle += 1;
500                         res = pthread_cond_timedwait(
501                                 &pool->condvar, &pool->mutex, &ts);
502                         pool->num_idle -= 1;
503
504                         if (pool->prefork_cond != NULL) {
505                                 /*
506                                  * Me must allow fork() to continue
507                                  * without anybody waiting on
508                                  * &pool->condvar. Tell
509                                  * pthreadpool_prepare_pool that we
510                                  * got that message.
511                                  */
512
513                                 res = pthread_cond_signal(pool->prefork_cond);
514                                 assert(res == 0);
515
516                                 res = pthread_mutex_unlock(&pool->mutex);
517                                 assert(res == 0);
518
519                                 /*
520                                  * pthreadpool_prepare_pool has
521                                  * already locked this mutex across
522                                  * the fork. This makes us wait
523                                  * without sitting in a condvar.
524                                  */
525                                 res = pthread_mutex_lock(&pool->fork_mutex);
526                                 assert(res == 0);
527                                 res = pthread_mutex_unlock(&pool->fork_mutex);
528                                 assert(res == 0);
529
530                                 res = pthread_mutex_lock(&pool->mutex);
531                                 assert(res == 0);
532                         }
533
534                         if (res == ETIMEDOUT) {
535
536                                 if (pool->num_jobs == 0) {
537                                         /*
538                                          * we timed out and still no work for
539                                          * us. Exit.
540                                          */
541                                         pthreadpool_server_exit(pool);
542                                         return NULL;
543                                 }
544
545                                 break;
546                         }
547                         assert(res == 0);
548                 }
549
550                 if (pthreadpool_get_job(pool, &job)) {
551                         int ret;
552
553                         /*
554                          * Do the work with the mutex unlocked
555                          */
556
557                         res = pthread_mutex_unlock(&pool->mutex);
558                         assert(res == 0);
559
560                         job.fn(job.private_data);
561
562                         ret = pool->signal_fn(job.id,
563                                               job.fn, job.private_data,
564                                               pool->signal_fn_private_data);
565
566                         res = pthread_mutex_lock(&pool->mutex);
567                         assert(res == 0);
568
569                         if (ret != 0) {
570                                 pthreadpool_server_exit(pool);
571                                 return NULL;
572                         }
573                 }
574
575                 if ((pool->num_jobs == 0) && pool->shutdown) {
576                         /*
577                          * No more work to do and we're asked to shut down, so
578                          * exit
579                          */
580                         pthreadpool_server_exit(pool);
581                         return NULL;
582                 }
583         }
584 }
585
586 static int pthreadpool_create_thread(struct pthreadpool *pool)
587 {
588         pthread_attr_t thread_attr;
589         pthread_t thread_id;
590         int res;
591         sigset_t mask, omask;
592
593         /*
594          * Create a new worker thread. It should not receive any signals.
595          */
596
597         sigfillset(&mask);
598
599         res = pthread_attr_init(&thread_attr);
600         if (res != 0) {
601                 return res;
602         }
603
604         res = pthread_attr_setdetachstate(
605                 &thread_attr, PTHREAD_CREATE_DETACHED);
606         if (res != 0) {
607                 pthread_attr_destroy(&thread_attr);
608                 return res;
609         }
610
611         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
612         if (res != 0) {
613                 pthread_attr_destroy(&thread_attr);
614                 return res;
615         }
616
617         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
618                              (void *)pool);
619
620         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
621
622         pthread_attr_destroy(&thread_attr);
623
624         if (res == 0) {
625                 pool->num_threads += 1;
626         }
627
628         return res;
629 }
630
631 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
632                         void (*fn)(void *private_data), void *private_data)
633 {
634         int res;
635
636         res = pthread_mutex_lock(&pool->mutex);
637         if (res != 0) {
638                 return res;
639         }
640
641         if (pool->shutdown) {
642                 /*
643                  * Protect against the pool being shut down while
644                  * trying to add a job
645                  */
646                 res = pthread_mutex_unlock(&pool->mutex);
647                 assert(res == 0);
648                 return EINVAL;
649         }
650
651         /*
652          * Add job to the end of the queue
653          */
654         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
655                 res = pthread_mutex_unlock(&pool->mutex);
656                 assert(res == 0);
657                 return ENOMEM;
658         }
659
660         if (pool->num_idle > 0) {
661                 int unlock_res;
662                 /*
663                  * We have idle threads, wake one.
664                  */
665                 res = pthread_cond_signal(&pool->condvar);
666                 if (res != 0) {
667                         pthreadpool_undo_put_job(pool);
668                 }
669                 unlock_res = pthread_mutex_unlock(&pool->mutex);
670                 assert(unlock_res == 0);
671                 return res;
672         }
673
674         if ((pool->max_threads != 0) &&
675             (pool->num_threads >= pool->max_threads)) {
676                 /*
677                  * No more new threads, we just queue the request
678                  */
679                 res = pthread_mutex_unlock(&pool->mutex);
680                 assert(res == 0);
681                 return 0;
682         }
683
684         res = pthreadpool_create_thread(pool);
685         if (res == 0) {
686                 res = pthread_mutex_unlock(&pool->mutex);
687                 assert(res == 0);
688                 return 0;
689         }
690
691         if (pool->num_threads != 0) {
692                 /*
693                  * At least one thread is still available, let
694                  * that one run the queued job.
695                  */
696                 res = pthread_mutex_unlock(&pool->mutex);
697                 assert(res == 0);
698                 return 0;
699         }
700
701         /*
702          * No thread could be created to run job, fallback to sync
703          * call.
704          */
705         pthreadpool_undo_put_job(pool);
706
707         res = pthread_mutex_unlock(&pool->mutex);
708         assert(res == 0);
709
710         fn(private_data);
711         res = pool->signal_fn(job_id, fn, private_data,
712                               pool->signal_fn_private_data);
713         return res;
714 }