55ea36ed0234cce231cd9908ff6482f67460e17e
[garming/samba-autobuild/.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         bool shutdown;
77
78         /*
79          * maximum number of threads
80          * 0 means no real thread, only strict sync processing.
81          */
82         unsigned max_threads;
83
84         /*
85          * Number of threads
86          */
87         unsigned num_threads;
88
89         /*
90          * Number of idle threads
91          */
92         unsigned num_idle;
93
94         /*
95          * Condition variable indicating that helper threads should
96          * quickly go away making way for fork() without anybody
97          * waiting on pool->condvar.
98          */
99         pthread_cond_t *prefork_cond;
100
101         /*
102          * Waiting position for helper threads while fork is
103          * running. The forking thread will have locked it, and all
104          * idle helper threads will sit here until after the fork,
105          * where the forking thread will unlock it again.
106          */
107         pthread_mutex_t fork_mutex;
108 };
109
110 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
111 static struct pthreadpool *pthreadpools = NULL;
112 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
113
114 static void pthreadpool_prep_atfork(void);
115
116 /*
117  * Initialize a thread pool
118  */
119
120 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
121                      int (*signal_fn)(int jobid,
122                                       void (*job_fn)(void *private_data),
123                                       void *job_fn_private_data,
124                                       void *private_data),
125                      void *signal_fn_private_data)
126 {
127         struct pthreadpool *pool;
128         int ret;
129
130         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
131         if (pool == NULL) {
132                 return ENOMEM;
133         }
134         pool->signal_fn = signal_fn;
135         pool->signal_fn_private_data = signal_fn_private_data;
136
137         pool->jobs_array_len = 4;
138         pool->jobs = calloc(
139                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
140
141         if (pool->jobs == NULL) {
142                 free(pool);
143                 return ENOMEM;
144         }
145
146         pool->head = pool->num_jobs = 0;
147
148         ret = pthread_mutex_init(&pool->mutex, NULL);
149         if (ret != 0) {
150                 free(pool->jobs);
151                 free(pool);
152                 return ret;
153         }
154
155         ret = pthread_cond_init(&pool->condvar, NULL);
156         if (ret != 0) {
157                 pthread_mutex_destroy(&pool->mutex);
158                 free(pool->jobs);
159                 free(pool);
160                 return ret;
161         }
162
163         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
164         if (ret != 0) {
165                 pthread_cond_destroy(&pool->condvar);
166                 pthread_mutex_destroy(&pool->mutex);
167                 free(pool->jobs);
168                 free(pool);
169                 return ret;
170         }
171
172         pool->shutdown = false;
173         pool->num_threads = 0;
174         pool->max_threads = max_threads;
175         pool->num_idle = 0;
176         pool->prefork_cond = NULL;
177
178         ret = pthread_mutex_lock(&pthreadpools_mutex);
179         if (ret != 0) {
180                 pthread_mutex_destroy(&pool->fork_mutex);
181                 pthread_cond_destroy(&pool->condvar);
182                 pthread_mutex_destroy(&pool->mutex);
183                 free(pool->jobs);
184                 free(pool);
185                 return ret;
186         }
187         DLIST_ADD(pthreadpools, pool);
188
189         ret = pthread_mutex_unlock(&pthreadpools_mutex);
190         assert(ret == 0);
191
192         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
193
194         *presult = pool;
195
196         return 0;
197 }
198
199 size_t pthreadpool_max_threads(struct pthreadpool *pool)
200 {
201         return pool->max_threads;
202 }
203
204 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
205 {
206         int res;
207         int unlock_res;
208         size_t ret;
209
210         res = pthread_mutex_lock(&pool->mutex);
211         if (res != 0) {
212                 return 0;
213         }
214
215         ret = pool->num_jobs;
216
217         unlock_res = pthread_mutex_unlock(&pool->mutex);
218         assert(unlock_res == 0);
219         return ret;
220 }
221
222 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
223 {
224         int ret;
225
226         ret = pthread_mutex_lock(&pool->fork_mutex);
227         assert(ret == 0);
228
229         ret = pthread_mutex_lock(&pool->mutex);
230         assert(ret == 0);
231
232         while (pool->num_idle != 0) {
233                 unsigned num_idle = pool->num_idle;
234                 pthread_cond_t prefork_cond;
235
236                 ret = pthread_cond_init(&prefork_cond, NULL);
237                 assert(ret == 0);
238
239                 /*
240                  * Push all idle threads off pool->condvar. In the
241                  * child we can destroy the pool, which would result
242                  * in undefined behaviour in the
243                  * pthread_cond_destroy(pool->condvar). glibc just
244                  * blocks here.
245                  */
246                 pool->prefork_cond = &prefork_cond;
247
248                 ret = pthread_cond_signal(&pool->condvar);
249                 assert(ret == 0);
250
251                 while (pool->num_idle == num_idle) {
252                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
253                         assert(ret == 0);
254                 }
255
256                 pool->prefork_cond = NULL;
257
258                 ret = pthread_cond_destroy(&prefork_cond);
259                 assert(ret == 0);
260         }
261
262         /*
263          * Probably it's well-defined somewhere: What happens to
264          * condvars after a fork? The rationale of pthread_atfork only
265          * writes about mutexes. So better be safe than sorry and
266          * destroy/reinit pool->condvar across a fork.
267          */
268
269         ret = pthread_cond_destroy(&pool->condvar);
270         assert(ret == 0);
271 }
272
273 static void pthreadpool_prepare(void)
274 {
275         int ret;
276         struct pthreadpool *pool;
277
278         ret = pthread_mutex_lock(&pthreadpools_mutex);
279         assert(ret == 0);
280
281         pool = pthreadpools;
282
283         while (pool != NULL) {
284                 pthreadpool_prepare_pool(pool);
285                 pool = pool->next;
286         }
287 }
288
289 static void pthreadpool_parent(void)
290 {
291         int ret;
292         struct pthreadpool *pool;
293
294         for (pool = DLIST_TAIL(pthreadpools);
295              pool != NULL;
296              pool = DLIST_PREV(pool)) {
297                 ret = pthread_cond_init(&pool->condvar, NULL);
298                 assert(ret == 0);
299                 ret = pthread_mutex_unlock(&pool->mutex);
300                 assert(ret == 0);
301                 ret = pthread_mutex_unlock(&pool->fork_mutex);
302                 assert(ret == 0);
303         }
304
305         ret = pthread_mutex_unlock(&pthreadpools_mutex);
306         assert(ret == 0);
307 }
308
309 static void pthreadpool_child(void)
310 {
311         int ret;
312         struct pthreadpool *pool;
313
314         for (pool = DLIST_TAIL(pthreadpools);
315              pool != NULL;
316              pool = DLIST_PREV(pool)) {
317
318                 pool->num_threads = 0;
319                 pool->num_idle = 0;
320                 pool->head = 0;
321                 pool->num_jobs = 0;
322
323                 ret = pthread_cond_init(&pool->condvar, NULL);
324                 assert(ret == 0);
325
326                 ret = pthread_mutex_unlock(&pool->mutex);
327                 assert(ret == 0);
328
329                 ret = pthread_mutex_unlock(&pool->fork_mutex);
330                 assert(ret == 0);
331         }
332
333         ret = pthread_mutex_unlock(&pthreadpools_mutex);
334         assert(ret == 0);
335 }
336
337 static void pthreadpool_prep_atfork(void)
338 {
339         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
340                        pthreadpool_child);
341 }
342
343 static int pthreadpool_free(struct pthreadpool *pool)
344 {
345         int ret, ret1, ret2;
346
347         ret = pthread_mutex_lock(&pthreadpools_mutex);
348         if (ret != 0) {
349                 return ret;
350         }
351         DLIST_REMOVE(pthreadpools, pool);
352         ret = pthread_mutex_unlock(&pthreadpools_mutex);
353         assert(ret == 0);
354
355         ret = pthread_mutex_lock(&pool->mutex);
356         assert(ret == 0);
357         ret = pthread_mutex_unlock(&pool->mutex);
358         assert(ret == 0);
359
360         ret = pthread_mutex_destroy(&pool->mutex);
361         ret1 = pthread_cond_destroy(&pool->condvar);
362         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
363
364         if (ret != 0) {
365                 return ret;
366         }
367         if (ret1 != 0) {
368                 return ret1;
369         }
370         if (ret2 != 0) {
371                 return ret2;
372         }
373
374         free(pool->jobs);
375         free(pool);
376
377         return 0;
378 }
379
380 /*
381  * Destroy a thread pool. Wake up all idle threads for exit. The last
382  * one will free the pool.
383  */
384
385 int pthreadpool_destroy(struct pthreadpool *pool)
386 {
387         int ret, ret1;
388
389         ret = pthread_mutex_lock(&pool->mutex);
390         if (ret != 0) {
391                 return ret;
392         }
393
394         if (pool->shutdown) {
395                 ret = pthread_mutex_unlock(&pool->mutex);
396                 assert(ret == 0);
397                 return EBUSY;
398         }
399
400         pool->shutdown = true;
401
402         if (pool->num_threads == 0) {
403                 ret = pthread_mutex_unlock(&pool->mutex);
404                 assert(ret == 0);
405
406                 ret = pthreadpool_free(pool);
407                 return ret;
408         }
409
410         /*
411          * We have active threads, tell them to finish.
412          */
413
414         ret = pthread_cond_broadcast(&pool->condvar);
415
416         ret1 = pthread_mutex_unlock(&pool->mutex);
417         assert(ret1 == 0);
418
419         return ret;
420 }
421
422 /*
423  * Prepare for pthread_exit(), pool->mutex must be locked and will be
424  * unlocked here. This is a bit of a layering violation, but here we
425  * also take care of removing the pool if we're the last thread.
426  */
427 static void pthreadpool_server_exit(struct pthreadpool *pool)
428 {
429         int ret;
430         bool free_it;
431
432         pool->num_threads -= 1;
433
434         free_it = (pool->shutdown && (pool->num_threads == 0));
435
436         ret = pthread_mutex_unlock(&pool->mutex);
437         assert(ret == 0);
438
439         if (free_it) {
440                 pthreadpool_free(pool);
441         }
442 }
443
444 static bool pthreadpool_get_job(struct pthreadpool *p,
445                                 struct pthreadpool_job *job)
446 {
447         if (p->num_jobs == 0) {
448                 return false;
449         }
450         *job = p->jobs[p->head];
451         p->head = (p->head+1) % p->jobs_array_len;
452         p->num_jobs -= 1;
453         return true;
454 }
455
456 static bool pthreadpool_put_job(struct pthreadpool *p,
457                                 int id,
458                                 void (*fn)(void *private_data),
459                                 void *private_data)
460 {
461         struct pthreadpool_job *job;
462
463         if (p->num_jobs == p->jobs_array_len) {
464                 struct pthreadpool_job *tmp;
465                 size_t new_len = p->jobs_array_len * 2;
466
467                 tmp = realloc(
468                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
469                 if (tmp == NULL) {
470                         return false;
471                 }
472                 p->jobs = tmp;
473
474                 /*
475                  * We just doubled the jobs array. The array implements a FIFO
476                  * queue with a modulo-based wraparound, so we have to memcpy
477                  * the jobs that are logically at the queue end but physically
478                  * before the queue head into the reallocated area. The new
479                  * space starts at the current jobs_array_len, and we have to
480                  * copy everything before the current head job into the new
481                  * area.
482                  */
483                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
484                        sizeof(struct pthreadpool_job) * p->head);
485
486                 p->jobs_array_len = new_len;
487         }
488
489         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
490         job->id = id;
491         job->fn = fn;
492         job->private_data = private_data;
493
494         p->num_jobs += 1;
495
496         return true;
497 }
498
499 static void pthreadpool_undo_put_job(struct pthreadpool *p)
500 {
501         p->num_jobs -= 1;
502 }
503
504 static void *pthreadpool_server(void *arg)
505 {
506         struct pthreadpool *pool = (struct pthreadpool *)arg;
507         int res;
508
509         res = pthread_mutex_lock(&pool->mutex);
510         if (res != 0) {
511                 return NULL;
512         }
513
514         while (1) {
515                 struct timespec ts;
516                 struct pthreadpool_job job;
517
518                 /*
519                  * idle-wait at most 1 second. If nothing happens in that
520                  * time, exit this thread.
521                  */
522
523                 clock_gettime(CLOCK_REALTIME, &ts);
524                 ts.tv_sec += 1;
525
526                 while ((pool->num_jobs == 0) && !pool->shutdown) {
527
528                         pool->num_idle += 1;
529                         res = pthread_cond_timedwait(
530                                 &pool->condvar, &pool->mutex, &ts);
531                         pool->num_idle -= 1;
532
533                         if (pool->prefork_cond != NULL) {
534                                 /*
535                                  * Me must allow fork() to continue
536                                  * without anybody waiting on
537                                  * &pool->condvar. Tell
538                                  * pthreadpool_prepare_pool that we
539                                  * got that message.
540                                  */
541
542                                 res = pthread_cond_signal(pool->prefork_cond);
543                                 assert(res == 0);
544
545                                 res = pthread_mutex_unlock(&pool->mutex);
546                                 assert(res == 0);
547
548                                 /*
549                                  * pthreadpool_prepare_pool has
550                                  * already locked this mutex across
551                                  * the fork. This makes us wait
552                                  * without sitting in a condvar.
553                                  */
554                                 res = pthread_mutex_lock(&pool->fork_mutex);
555                                 assert(res == 0);
556                                 res = pthread_mutex_unlock(&pool->fork_mutex);
557                                 assert(res == 0);
558
559                                 res = pthread_mutex_lock(&pool->mutex);
560                                 assert(res == 0);
561                         }
562
563                         if (res == ETIMEDOUT) {
564
565                                 if (pool->num_jobs == 0) {
566                                         /*
567                                          * we timed out and still no work for
568                                          * us. Exit.
569                                          */
570                                         pthreadpool_server_exit(pool);
571                                         return NULL;
572                                 }
573
574                                 break;
575                         }
576                         assert(res == 0);
577                 }
578
579                 if (pthreadpool_get_job(pool, &job)) {
580                         int ret;
581
582                         /*
583                          * Do the work with the mutex unlocked
584                          */
585
586                         res = pthread_mutex_unlock(&pool->mutex);
587                         assert(res == 0);
588
589                         job.fn(job.private_data);
590
591                         ret = pool->signal_fn(job.id,
592                                               job.fn, job.private_data,
593                                               pool->signal_fn_private_data);
594
595                         res = pthread_mutex_lock(&pool->mutex);
596                         assert(res == 0);
597
598                         if (ret != 0) {
599                                 pthreadpool_server_exit(pool);
600                                 return NULL;
601                         }
602                 }
603
604                 if ((pool->num_jobs == 0) && pool->shutdown) {
605                         /*
606                          * No more work to do and we're asked to shut down, so
607                          * exit
608                          */
609                         pthreadpool_server_exit(pool);
610                         return NULL;
611                 }
612         }
613 }
614
615 static int pthreadpool_create_thread(struct pthreadpool *pool)
616 {
617         pthread_attr_t thread_attr;
618         pthread_t thread_id;
619         int res;
620         sigset_t mask, omask;
621
622         /*
623          * Create a new worker thread. It should not receive any signals.
624          */
625
626         sigfillset(&mask);
627
628         res = pthread_attr_init(&thread_attr);
629         if (res != 0) {
630                 return res;
631         }
632
633         res = pthread_attr_setdetachstate(
634                 &thread_attr, PTHREAD_CREATE_DETACHED);
635         if (res != 0) {
636                 pthread_attr_destroy(&thread_attr);
637                 return res;
638         }
639
640         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
641         if (res != 0) {
642                 pthread_attr_destroy(&thread_attr);
643                 return res;
644         }
645
646         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
647                              (void *)pool);
648
649         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
650
651         pthread_attr_destroy(&thread_attr);
652
653         if (res == 0) {
654                 pool->num_threads += 1;
655         }
656
657         return res;
658 }
659
660 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
661                         void (*fn)(void *private_data), void *private_data)
662 {
663         int res;
664         int unlock_res;
665
666         res = pthread_mutex_lock(&pool->mutex);
667         if (res != 0) {
668                 return res;
669         }
670
671         if (pool->shutdown) {
672                 /*
673                  * Protect against the pool being shut down while
674                  * trying to add a job
675                  */
676                 unlock_res = pthread_mutex_unlock(&pool->mutex);
677                 assert(unlock_res == 0);
678                 return EINVAL;
679         }
680
681         if (pool->max_threads == 0) {
682                 unlock_res = pthread_mutex_unlock(&pool->mutex);
683                 assert(unlock_res == 0);
684
685                 /*
686                  * If no thread are allowed we do strict sync processing.
687                  */
688                 fn(private_data);
689                 res = pool->signal_fn(job_id, fn, private_data,
690                                       pool->signal_fn_private_data);
691                 return res;
692         }
693
694         /*
695          * Add job to the end of the queue
696          */
697         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
698                 unlock_res = pthread_mutex_unlock(&pool->mutex);
699                 assert(unlock_res == 0);
700                 return ENOMEM;
701         }
702
703         if (pool->num_idle > 0) {
704                 /*
705                  * We have idle threads, wake one.
706                  */
707                 res = pthread_cond_signal(&pool->condvar);
708                 if (res != 0) {
709                         pthreadpool_undo_put_job(pool);
710                 }
711                 unlock_res = pthread_mutex_unlock(&pool->mutex);
712                 assert(unlock_res == 0);
713                 return res;
714         }
715
716         if (pool->num_threads >= pool->max_threads) {
717                 /*
718                  * No more new threads, we just queue the request
719                  */
720                 unlock_res = pthread_mutex_unlock(&pool->mutex);
721                 assert(unlock_res == 0);
722                 return 0;
723         }
724
725         res = pthreadpool_create_thread(pool);
726         if (res == 0) {
727                 unlock_res = pthread_mutex_unlock(&pool->mutex);
728                 assert(unlock_res == 0);
729                 return 0;
730         }
731
732         if (pool->num_threads != 0) {
733                 /*
734                  * At least one thread is still available, let
735                  * that one run the queued job.
736                  */
737                 unlock_res = pthread_mutex_unlock(&pool->mutex);
738                 assert(unlock_res == 0);
739                 return 0;
740         }
741
742         /*
743          * No thread could be created to run job, fallback to sync
744          * call.
745          */
746         pthreadpool_undo_put_job(pool);
747
748         unlock_res = pthread_mutex_unlock(&pool->mutex);
749         assert(unlock_res == 0);
750
751         return res;
752 }
753
754 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
755                               void (*fn)(void *private_data), void *private_data)
756 {
757         int res;
758         size_t i, j;
759         size_t num = 0;
760
761         res = pthread_mutex_lock(&pool->mutex);
762         if (res != 0) {
763                 return res;
764         }
765
766         for (i = 0, j = 0; i < pool->num_jobs; i++) {
767                 size_t idx = (pool->head + i) % pool->jobs_array_len;
768                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
769                 struct pthreadpool_job *job = &pool->jobs[idx];
770
771                 if ((job->private_data == private_data) &&
772                     (job->id == job_id) &&
773                     (job->fn == fn))
774                 {
775                         /*
776                          * Just skip the entry.
777                          */
778                         num++;
779                         continue;
780                 }
781
782                 /*
783                  * If we already removed one or more jobs (so j will be smaller
784                  * then i), we need to fill possible gaps in the logical list.
785                  */
786                 if (j < i) {
787                         pool->jobs[new_idx] = *job;
788                 }
789                 j++;
790         }
791
792         pool->num_jobs -= num;
793
794         res = pthread_mutex_unlock(&pool->mutex);
795         assert(res == 0);
796
797         return num;
798 }