pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy()
[garming/samba-autobuild/.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads to stop processing further jobs
75          * and exit.
76          */
77         bool stopped;
78
79         /*
80          * indicator to the last worker thread to free the pool
81          * resources.
82          */
83         bool destroyed;
84
85         /*
86          * maximum number of threads
87          * 0 means no real thread, only strict sync processing.
88          */
89         unsigned max_threads;
90
91         /*
92          * Number of threads
93          */
94         unsigned num_threads;
95
96         /*
97          * Number of idle threads
98          */
99         unsigned num_idle;
100
101         /*
102          * Condition variable indicating that helper threads should
103          * quickly go away making way for fork() without anybody
104          * waiting on pool->condvar.
105          */
106         pthread_cond_t *prefork_cond;
107
108         /*
109          * Waiting position for helper threads while fork is
110          * running. The forking thread will have locked it, and all
111          * idle helper threads will sit here until after the fork,
112          * where the forking thread will unlock it again.
113          */
114         pthread_mutex_t fork_mutex;
115 };
116
117 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
118 static struct pthreadpool *pthreadpools = NULL;
119 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
120
121 static void pthreadpool_prep_atfork(void);
122
123 /*
124  * Initialize a thread pool
125  */
126
127 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
128                      int (*signal_fn)(int jobid,
129                                       void (*job_fn)(void *private_data),
130                                       void *job_fn_private_data,
131                                       void *private_data),
132                      void *signal_fn_private_data)
133 {
134         struct pthreadpool *pool;
135         int ret;
136
137         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
138         if (pool == NULL) {
139                 return ENOMEM;
140         }
141         pool->signal_fn = signal_fn;
142         pool->signal_fn_private_data = signal_fn_private_data;
143
144         pool->jobs_array_len = 4;
145         pool->jobs = calloc(
146                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
147
148         if (pool->jobs == NULL) {
149                 free(pool);
150                 return ENOMEM;
151         }
152
153         pool->head = pool->num_jobs = 0;
154
155         ret = pthread_mutex_init(&pool->mutex, NULL);
156         if (ret != 0) {
157                 free(pool->jobs);
158                 free(pool);
159                 return ret;
160         }
161
162         ret = pthread_cond_init(&pool->condvar, NULL);
163         if (ret != 0) {
164                 pthread_mutex_destroy(&pool->mutex);
165                 free(pool->jobs);
166                 free(pool);
167                 return ret;
168         }
169
170         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
171         if (ret != 0) {
172                 pthread_cond_destroy(&pool->condvar);
173                 pthread_mutex_destroy(&pool->mutex);
174                 free(pool->jobs);
175                 free(pool);
176                 return ret;
177         }
178
179         pool->stopped = false;
180         pool->destroyed = false;
181         pool->num_threads = 0;
182         pool->max_threads = max_threads;
183         pool->num_idle = 0;
184         pool->prefork_cond = NULL;
185
186         ret = pthread_mutex_lock(&pthreadpools_mutex);
187         if (ret != 0) {
188                 pthread_mutex_destroy(&pool->fork_mutex);
189                 pthread_cond_destroy(&pool->condvar);
190                 pthread_mutex_destroy(&pool->mutex);
191                 free(pool->jobs);
192                 free(pool);
193                 return ret;
194         }
195         DLIST_ADD(pthreadpools, pool);
196
197         ret = pthread_mutex_unlock(&pthreadpools_mutex);
198         assert(ret == 0);
199
200         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
201
202         *presult = pool;
203
204         return 0;
205 }
206
207 size_t pthreadpool_max_threads(struct pthreadpool *pool)
208 {
209         if (pool->stopped) {
210                 return 0;
211         }
212
213         return pool->max_threads;
214 }
215
216 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
217 {
218         int res;
219         int unlock_res;
220         size_t ret;
221
222         if (pool->stopped) {
223                 return 0;
224         }
225
226         res = pthread_mutex_lock(&pool->mutex);
227         if (res != 0) {
228                 return res;
229         }
230
231         if (pool->stopped) {
232                 unlock_res = pthread_mutex_unlock(&pool->mutex);
233                 assert(unlock_res == 0);
234                 return 0;
235         }
236
237         ret = pool->num_jobs;
238
239         unlock_res = pthread_mutex_unlock(&pool->mutex);
240         assert(unlock_res == 0);
241         return ret;
242 }
243
244 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
245 {
246         int ret;
247
248         ret = pthread_mutex_lock(&pool->fork_mutex);
249         assert(ret == 0);
250
251         ret = pthread_mutex_lock(&pool->mutex);
252         assert(ret == 0);
253
254         while (pool->num_idle != 0) {
255                 unsigned num_idle = pool->num_idle;
256                 pthread_cond_t prefork_cond;
257
258                 ret = pthread_cond_init(&prefork_cond, NULL);
259                 assert(ret == 0);
260
261                 /*
262                  * Push all idle threads off pool->condvar. In the
263                  * child we can destroy the pool, which would result
264                  * in undefined behaviour in the
265                  * pthread_cond_destroy(pool->condvar). glibc just
266                  * blocks here.
267                  */
268                 pool->prefork_cond = &prefork_cond;
269
270                 ret = pthread_cond_signal(&pool->condvar);
271                 assert(ret == 0);
272
273                 while (pool->num_idle == num_idle) {
274                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
275                         assert(ret == 0);
276                 }
277
278                 pool->prefork_cond = NULL;
279
280                 ret = pthread_cond_destroy(&prefork_cond);
281                 assert(ret == 0);
282         }
283
284         /*
285          * Probably it's well-defined somewhere: What happens to
286          * condvars after a fork? The rationale of pthread_atfork only
287          * writes about mutexes. So better be safe than sorry and
288          * destroy/reinit pool->condvar across a fork.
289          */
290
291         ret = pthread_cond_destroy(&pool->condvar);
292         assert(ret == 0);
293 }
294
295 static void pthreadpool_prepare(void)
296 {
297         int ret;
298         struct pthreadpool *pool;
299
300         ret = pthread_mutex_lock(&pthreadpools_mutex);
301         assert(ret == 0);
302
303         pool = pthreadpools;
304
305         while (pool != NULL) {
306                 pthreadpool_prepare_pool(pool);
307                 pool = pool->next;
308         }
309 }
310
311 static void pthreadpool_parent(void)
312 {
313         int ret;
314         struct pthreadpool *pool;
315
316         for (pool = DLIST_TAIL(pthreadpools);
317              pool != NULL;
318              pool = DLIST_PREV(pool)) {
319                 ret = pthread_cond_init(&pool->condvar, NULL);
320                 assert(ret == 0);
321                 ret = pthread_mutex_unlock(&pool->mutex);
322                 assert(ret == 0);
323                 ret = pthread_mutex_unlock(&pool->fork_mutex);
324                 assert(ret == 0);
325         }
326
327         ret = pthread_mutex_unlock(&pthreadpools_mutex);
328         assert(ret == 0);
329 }
330
331 static void pthreadpool_child(void)
332 {
333         int ret;
334         struct pthreadpool *pool;
335
336         for (pool = DLIST_TAIL(pthreadpools);
337              pool != NULL;
338              pool = DLIST_PREV(pool)) {
339
340                 pool->num_threads = 0;
341                 pool->num_idle = 0;
342                 pool->head = 0;
343                 pool->num_jobs = 0;
344
345                 ret = pthread_cond_init(&pool->condvar, NULL);
346                 assert(ret == 0);
347
348                 ret = pthread_mutex_unlock(&pool->mutex);
349                 assert(ret == 0);
350
351                 ret = pthread_mutex_unlock(&pool->fork_mutex);
352                 assert(ret == 0);
353         }
354
355         ret = pthread_mutex_unlock(&pthreadpools_mutex);
356         assert(ret == 0);
357 }
358
359 static void pthreadpool_prep_atfork(void)
360 {
361         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
362                        pthreadpool_child);
363 }
364
365 static int pthreadpool_free(struct pthreadpool *pool)
366 {
367         int ret, ret1, ret2;
368
369         ret = pthread_mutex_lock(&pthreadpools_mutex);
370         if (ret != 0) {
371                 return ret;
372         }
373         DLIST_REMOVE(pthreadpools, pool);
374         ret = pthread_mutex_unlock(&pthreadpools_mutex);
375         assert(ret == 0);
376
377         ret = pthread_mutex_lock(&pool->mutex);
378         assert(ret == 0);
379         ret = pthread_mutex_unlock(&pool->mutex);
380         assert(ret == 0);
381
382         ret = pthread_mutex_destroy(&pool->mutex);
383         ret1 = pthread_cond_destroy(&pool->condvar);
384         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
385
386         if (ret != 0) {
387                 return ret;
388         }
389         if (ret1 != 0) {
390                 return ret1;
391         }
392         if (ret2 != 0) {
393                 return ret2;
394         }
395
396         free(pool->jobs);
397         free(pool);
398
399         return 0;
400 }
401
402 /*
403  * Stop a thread pool. Wake up all idle threads for exit.
404  */
405
406 static int pthreadpool_stop_locked(struct pthreadpool *pool)
407 {
408         int ret;
409
410         pool->stopped = true;
411
412         if (pool->num_threads == 0) {
413                 return 0;
414         }
415
416         /*
417          * We have active threads, tell them to finish.
418          */
419
420         ret = pthread_cond_broadcast(&pool->condvar);
421
422         return ret;
423 }
424
425 /*
426  * Stop a thread pool. Wake up all idle threads for exit.
427  */
428
429 int pthreadpool_stop(struct pthreadpool *pool)
430 {
431         int ret, ret1;
432
433         ret = pthread_mutex_lock(&pool->mutex);
434         if (ret != 0) {
435                 return ret;
436         }
437
438         if (!pool->stopped) {
439                 ret = pthreadpool_stop_locked(pool);
440         }
441
442         ret1 = pthread_mutex_unlock(&pool->mutex);
443         assert(ret1 == 0);
444
445         return ret;
446 }
447
448 /*
449  * Destroy a thread pool. Wake up all idle threads for exit. The last
450  * one will free the pool.
451  */
452
453 int pthreadpool_destroy(struct pthreadpool *pool)
454 {
455         int ret, ret1;
456         bool free_it;
457
458         assert(!pool->destroyed);
459
460         ret = pthread_mutex_lock(&pool->mutex);
461         if (ret != 0) {
462                 return ret;
463         }
464
465         pool->destroyed = true;
466
467         if (!pool->stopped) {
468                 ret = pthreadpool_stop_locked(pool);
469         }
470
471         free_it = (pool->num_threads == 0);
472
473         ret1 = pthread_mutex_unlock(&pool->mutex);
474         assert(ret1 == 0);
475
476         if (free_it) {
477                 pthreadpool_free(pool);
478         }
479
480         return ret;
481 }
482 /*
483  * Prepare for pthread_exit(), pool->mutex must be locked and will be
484  * unlocked here. This is a bit of a layering violation, but here we
485  * also take care of removing the pool if we're the last thread.
486  */
487 static void pthreadpool_server_exit(struct pthreadpool *pool)
488 {
489         int ret;
490         bool free_it;
491
492         pool->num_threads -= 1;
493
494         free_it = (pool->destroyed && (pool->num_threads == 0));
495
496         ret = pthread_mutex_unlock(&pool->mutex);
497         assert(ret == 0);
498
499         if (free_it) {
500                 pthreadpool_free(pool);
501         }
502 }
503
504 static bool pthreadpool_get_job(struct pthreadpool *p,
505                                 struct pthreadpool_job *job)
506 {
507         if (p->stopped) {
508                 return false;
509         }
510
511         if (p->num_jobs == 0) {
512                 return false;
513         }
514         *job = p->jobs[p->head];
515         p->head = (p->head+1) % p->jobs_array_len;
516         p->num_jobs -= 1;
517         return true;
518 }
519
520 static bool pthreadpool_put_job(struct pthreadpool *p,
521                                 int id,
522                                 void (*fn)(void *private_data),
523                                 void *private_data)
524 {
525         struct pthreadpool_job *job;
526
527         if (p->num_jobs == p->jobs_array_len) {
528                 struct pthreadpool_job *tmp;
529                 size_t new_len = p->jobs_array_len * 2;
530
531                 tmp = realloc(
532                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
533                 if (tmp == NULL) {
534                         return false;
535                 }
536                 p->jobs = tmp;
537
538                 /*
539                  * We just doubled the jobs array. The array implements a FIFO
540                  * queue with a modulo-based wraparound, so we have to memcpy
541                  * the jobs that are logically at the queue end but physically
542                  * before the queue head into the reallocated area. The new
543                  * space starts at the current jobs_array_len, and we have to
544                  * copy everything before the current head job into the new
545                  * area.
546                  */
547                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
548                        sizeof(struct pthreadpool_job) * p->head);
549
550                 p->jobs_array_len = new_len;
551         }
552
553         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
554         job->id = id;
555         job->fn = fn;
556         job->private_data = private_data;
557
558         p->num_jobs += 1;
559
560         return true;
561 }
562
563 static void pthreadpool_undo_put_job(struct pthreadpool *p)
564 {
565         p->num_jobs -= 1;
566 }
567
568 static void *pthreadpool_server(void *arg)
569 {
570         struct pthreadpool *pool = (struct pthreadpool *)arg;
571         int res;
572
573         res = pthread_mutex_lock(&pool->mutex);
574         if (res != 0) {
575                 return NULL;
576         }
577
578         while (1) {
579                 struct timespec ts;
580                 struct pthreadpool_job job;
581
582                 /*
583                  * idle-wait at most 1 second. If nothing happens in that
584                  * time, exit this thread.
585                  */
586
587                 clock_gettime(CLOCK_REALTIME, &ts);
588                 ts.tv_sec += 1;
589
590                 while ((pool->num_jobs == 0) && !pool->stopped) {
591
592                         pool->num_idle += 1;
593                         res = pthread_cond_timedwait(
594                                 &pool->condvar, &pool->mutex, &ts);
595                         pool->num_idle -= 1;
596
597                         if (pool->prefork_cond != NULL) {
598                                 /*
599                                  * Me must allow fork() to continue
600                                  * without anybody waiting on
601                                  * &pool->condvar. Tell
602                                  * pthreadpool_prepare_pool that we
603                                  * got that message.
604                                  */
605
606                                 res = pthread_cond_signal(pool->prefork_cond);
607                                 assert(res == 0);
608
609                                 res = pthread_mutex_unlock(&pool->mutex);
610                                 assert(res == 0);
611
612                                 /*
613                                  * pthreadpool_prepare_pool has
614                                  * already locked this mutex across
615                                  * the fork. This makes us wait
616                                  * without sitting in a condvar.
617                                  */
618                                 res = pthread_mutex_lock(&pool->fork_mutex);
619                                 assert(res == 0);
620                                 res = pthread_mutex_unlock(&pool->fork_mutex);
621                                 assert(res == 0);
622
623                                 res = pthread_mutex_lock(&pool->mutex);
624                                 assert(res == 0);
625                         }
626
627                         if (res == ETIMEDOUT) {
628
629                                 if (pool->num_jobs == 0) {
630                                         /*
631                                          * we timed out and still no work for
632                                          * us. Exit.
633                                          */
634                                         pthreadpool_server_exit(pool);
635                                         return NULL;
636                                 }
637
638                                 break;
639                         }
640                         assert(res == 0);
641                 }
642
643                 if (pthreadpool_get_job(pool, &job)) {
644                         int ret;
645
646                         /*
647                          * Do the work with the mutex unlocked
648                          */
649
650                         res = pthread_mutex_unlock(&pool->mutex);
651                         assert(res == 0);
652
653                         job.fn(job.private_data);
654
655                         ret = pool->signal_fn(job.id,
656                                               job.fn, job.private_data,
657                                               pool->signal_fn_private_data);
658
659                         res = pthread_mutex_lock(&pool->mutex);
660                         assert(res == 0);
661
662                         if (ret != 0) {
663                                 pthreadpool_server_exit(pool);
664                                 return NULL;
665                         }
666                 }
667
668                 if (pool->stopped) {
669                         /*
670                          * we're asked to stop processing jobs, so exit
671                          */
672                         pthreadpool_server_exit(pool);
673                         return NULL;
674                 }
675         }
676 }
677
678 static int pthreadpool_create_thread(struct pthreadpool *pool)
679 {
680         pthread_attr_t thread_attr;
681         pthread_t thread_id;
682         int res;
683         sigset_t mask, omask;
684
685         /*
686          * Create a new worker thread. It should not receive any signals.
687          */
688
689         sigfillset(&mask);
690
691         res = pthread_attr_init(&thread_attr);
692         if (res != 0) {
693                 return res;
694         }
695
696         res = pthread_attr_setdetachstate(
697                 &thread_attr, PTHREAD_CREATE_DETACHED);
698         if (res != 0) {
699                 pthread_attr_destroy(&thread_attr);
700                 return res;
701         }
702
703         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
704         if (res != 0) {
705                 pthread_attr_destroy(&thread_attr);
706                 return res;
707         }
708
709         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
710                              (void *)pool);
711
712         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
713
714         pthread_attr_destroy(&thread_attr);
715
716         if (res == 0) {
717                 pool->num_threads += 1;
718         }
719
720         return res;
721 }
722
723 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
724                         void (*fn)(void *private_data), void *private_data)
725 {
726         int res;
727         int unlock_res;
728
729         assert(!pool->destroyed);
730
731         res = pthread_mutex_lock(&pool->mutex);
732         if (res != 0) {
733                 return res;
734         }
735
736         if (pool->stopped) {
737                 /*
738                  * Protect against the pool being shut down while
739                  * trying to add a job
740                  */
741                 unlock_res = pthread_mutex_unlock(&pool->mutex);
742                 assert(unlock_res == 0);
743                 return EINVAL;
744         }
745
746         if (pool->max_threads == 0) {
747                 unlock_res = pthread_mutex_unlock(&pool->mutex);
748                 assert(unlock_res == 0);
749
750                 /*
751                  * If no thread are allowed we do strict sync processing.
752                  */
753                 fn(private_data);
754                 res = pool->signal_fn(job_id, fn, private_data,
755                                       pool->signal_fn_private_data);
756                 return res;
757         }
758
759         /*
760          * Add job to the end of the queue
761          */
762         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
763                 unlock_res = pthread_mutex_unlock(&pool->mutex);
764                 assert(unlock_res == 0);
765                 return ENOMEM;
766         }
767
768         if (pool->num_idle > 0) {
769                 /*
770                  * We have idle threads, wake one.
771                  */
772                 res = pthread_cond_signal(&pool->condvar);
773                 if (res != 0) {
774                         pthreadpool_undo_put_job(pool);
775                 }
776                 unlock_res = pthread_mutex_unlock(&pool->mutex);
777                 assert(unlock_res == 0);
778                 return res;
779         }
780
781         if (pool->num_threads >= pool->max_threads) {
782                 /*
783                  * No more new threads, we just queue the request
784                  */
785                 unlock_res = pthread_mutex_unlock(&pool->mutex);
786                 assert(unlock_res == 0);
787                 return 0;
788         }
789
790         res = pthreadpool_create_thread(pool);
791         if (res == 0) {
792                 unlock_res = pthread_mutex_unlock(&pool->mutex);
793                 assert(unlock_res == 0);
794                 return 0;
795         }
796
797         if (pool->num_threads != 0) {
798                 /*
799                  * At least one thread is still available, let
800                  * that one run the queued job.
801                  */
802                 unlock_res = pthread_mutex_unlock(&pool->mutex);
803                 assert(unlock_res == 0);
804                 return 0;
805         }
806
807         /*
808          * No thread could be created to run job, fallback to sync
809          * call.
810          */
811         pthreadpool_undo_put_job(pool);
812
813         unlock_res = pthread_mutex_unlock(&pool->mutex);
814         assert(unlock_res == 0);
815
816         return res;
817 }
818
819 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
820                               void (*fn)(void *private_data), void *private_data)
821 {
822         int res;
823         size_t i, j;
824         size_t num = 0;
825
826         assert(!pool->destroyed);
827
828         res = pthread_mutex_lock(&pool->mutex);
829         if (res != 0) {
830                 return res;
831         }
832
833         for (i = 0, j = 0; i < pool->num_jobs; i++) {
834                 size_t idx = (pool->head + i) % pool->jobs_array_len;
835                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
836                 struct pthreadpool_job *job = &pool->jobs[idx];
837
838                 if ((job->private_data == private_data) &&
839                     (job->id == job_id) &&
840                     (job->fn == fn))
841                 {
842                         /*
843                          * Just skip the entry.
844                          */
845                         num++;
846                         continue;
847                 }
848
849                 /*
850                  * If we already removed one or more jobs (so j will be smaller
851                  * then i), we need to fill possible gaps in the logical list.
852                  */
853                 if (j < i) {
854                         pool->jobs[new_idx] = *job;
855                 }
856                 j++;
857         }
858
859         pool->num_jobs -= num;
860
861         res = pthread_mutex_unlock(&pool->mutex);
862         assert(res == 0);
863
864         return num;
865 }