pthreadpool: replace assert_return_code(ret, 0); with assert_int_equal(ret, 0);
[samba.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads to stop processing further jobs
75          * and exit.
76          */
77         bool stopped;
78
79         /*
80          * indicator to the last worker thread to free the pool
81          * resources.
82          */
83         bool destroyed;
84
85         /*
86          * maximum number of threads
87          * 0 means no real thread, only strict sync processing.
88          */
89         unsigned max_threads;
90
91         /*
92          * Number of threads
93          */
94         unsigned num_threads;
95
96         /*
97          * Number of idle threads
98          */
99         unsigned num_idle;
100
101         /*
102          * Condition variable indicating that helper threads should
103          * quickly go away making way for fork() without anybody
104          * waiting on pool->condvar.
105          */
106         pthread_cond_t *prefork_cond;
107
108         /*
109          * Waiting position for helper threads while fork is
110          * running. The forking thread will have locked it, and all
111          * idle helper threads will sit here until after the fork,
112          * where the forking thread will unlock it again.
113          */
114         pthread_mutex_t fork_mutex;
115 };
116
117 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
118 static struct pthreadpool *pthreadpools = NULL;
119 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
120
121 static void pthreadpool_prep_atfork(void);
122
123 /*
124  * Initialize a thread pool
125  */
126
127 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
128                      int (*signal_fn)(int jobid,
129                                       void (*job_fn)(void *private_data),
130                                       void *job_fn_private_data,
131                                       void *private_data),
132                      void *signal_fn_private_data)
133 {
134         struct pthreadpool *pool;
135         int ret;
136
137         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
138         if (pool == NULL) {
139                 return ENOMEM;
140         }
141         pool->signal_fn = signal_fn;
142         pool->signal_fn_private_data = signal_fn_private_data;
143
144         pool->jobs_array_len = 4;
145         pool->jobs = calloc(
146                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
147
148         if (pool->jobs == NULL) {
149                 free(pool);
150                 return ENOMEM;
151         }
152
153         pool->head = pool->num_jobs = 0;
154
155         ret = pthread_mutex_init(&pool->mutex, NULL);
156         if (ret != 0) {
157                 free(pool->jobs);
158                 free(pool);
159                 return ret;
160         }
161
162         ret = pthread_cond_init(&pool->condvar, NULL);
163         if (ret != 0) {
164                 pthread_mutex_destroy(&pool->mutex);
165                 free(pool->jobs);
166                 free(pool);
167                 return ret;
168         }
169
170         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
171         if (ret != 0) {
172                 pthread_cond_destroy(&pool->condvar);
173                 pthread_mutex_destroy(&pool->mutex);
174                 free(pool->jobs);
175                 free(pool);
176                 return ret;
177         }
178
179         pool->stopped = false;
180         pool->destroyed = false;
181         pool->num_threads = 0;
182         pool->max_threads = max_threads;
183         pool->num_idle = 0;
184         pool->prefork_cond = NULL;
185
186         ret = pthread_mutex_lock(&pthreadpools_mutex);
187         if (ret != 0) {
188                 pthread_mutex_destroy(&pool->fork_mutex);
189                 pthread_cond_destroy(&pool->condvar);
190                 pthread_mutex_destroy(&pool->mutex);
191                 free(pool->jobs);
192                 free(pool);
193                 return ret;
194         }
195         DLIST_ADD(pthreadpools, pool);
196
197         ret = pthread_mutex_unlock(&pthreadpools_mutex);
198         assert(ret == 0);
199
200         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
201
202         *presult = pool;
203
204         return 0;
205 }
206
207 size_t pthreadpool_max_threads(struct pthreadpool *pool)
208 {
209         if (pool->stopped) {
210                 return 0;
211         }
212
213         return pool->max_threads;
214 }
215
216 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
217 {
218         int res;
219         int unlock_res;
220         size_t ret;
221
222         if (pool->stopped) {
223                 return 0;
224         }
225
226         res = pthread_mutex_lock(&pool->mutex);
227         if (res != 0) {
228                 return res;
229         }
230
231         if (pool->stopped) {
232                 unlock_res = pthread_mutex_unlock(&pool->mutex);
233                 assert(unlock_res == 0);
234                 return 0;
235         }
236
237         ret = pool->num_jobs;
238
239         unlock_res = pthread_mutex_unlock(&pool->mutex);
240         assert(unlock_res == 0);
241         return ret;
242 }
243
244 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
245 {
246         int ret;
247
248         ret = pthread_mutex_lock(&pool->fork_mutex);
249         assert(ret == 0);
250
251         ret = pthread_mutex_lock(&pool->mutex);
252         assert(ret == 0);
253
254         while (pool->num_idle != 0) {
255                 unsigned num_idle = pool->num_idle;
256                 pthread_cond_t prefork_cond;
257
258                 ret = pthread_cond_init(&prefork_cond, NULL);
259                 assert(ret == 0);
260
261                 /*
262                  * Push all idle threads off pool->condvar. In the
263                  * child we can destroy the pool, which would result
264                  * in undefined behaviour in the
265                  * pthread_cond_destroy(pool->condvar). glibc just
266                  * blocks here.
267                  */
268                 pool->prefork_cond = &prefork_cond;
269
270                 ret = pthread_cond_signal(&pool->condvar);
271                 assert(ret == 0);
272
273                 while (pool->num_idle == num_idle) {
274                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
275                         assert(ret == 0);
276                 }
277
278                 pool->prefork_cond = NULL;
279
280                 ret = pthread_cond_destroy(&prefork_cond);
281                 assert(ret == 0);
282         }
283
284         /*
285          * Probably it's well-defined somewhere: What happens to
286          * condvars after a fork? The rationale of pthread_atfork only
287          * writes about mutexes. So better be safe than sorry and
288          * destroy/reinit pool->condvar across a fork.
289          */
290
291         ret = pthread_cond_destroy(&pool->condvar);
292         assert(ret == 0);
293 }
294
295 static void pthreadpool_prepare(void)
296 {
297         int ret;
298         struct pthreadpool *pool;
299
300         ret = pthread_mutex_lock(&pthreadpools_mutex);
301         assert(ret == 0);
302
303         pool = pthreadpools;
304
305         while (pool != NULL) {
306                 pthreadpool_prepare_pool(pool);
307                 pool = pool->next;
308         }
309 }
310
311 static void pthreadpool_parent(void)
312 {
313         int ret;
314         struct pthreadpool *pool;
315
316         for (pool = DLIST_TAIL(pthreadpools);
317              pool != NULL;
318              pool = DLIST_PREV(pool)) {
319                 ret = pthread_cond_init(&pool->condvar, NULL);
320                 assert(ret == 0);
321                 ret = pthread_mutex_unlock(&pool->mutex);
322                 assert(ret == 0);
323                 ret = pthread_mutex_unlock(&pool->fork_mutex);
324                 assert(ret == 0);
325         }
326
327         ret = pthread_mutex_unlock(&pthreadpools_mutex);
328         assert(ret == 0);
329 }
330
331 static void pthreadpool_child(void)
332 {
333         int ret;
334         struct pthreadpool *pool;
335
336         for (pool = DLIST_TAIL(pthreadpools);
337              pool != NULL;
338              pool = DLIST_PREV(pool)) {
339
340                 pool->num_threads = 0;
341                 pool->num_idle = 0;
342                 pool->head = 0;
343                 pool->num_jobs = 0;
344                 pool->stopped = true;
345
346                 ret = pthread_cond_init(&pool->condvar, NULL);
347                 assert(ret == 0);
348
349                 ret = pthread_mutex_unlock(&pool->mutex);
350                 assert(ret == 0);
351
352                 ret = pthread_mutex_unlock(&pool->fork_mutex);
353                 assert(ret == 0);
354         }
355
356         ret = pthread_mutex_unlock(&pthreadpools_mutex);
357         assert(ret == 0);
358 }
359
360 static void pthreadpool_prep_atfork(void)
361 {
362         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
363                        pthreadpool_child);
364 }
365
366 static int pthreadpool_free(struct pthreadpool *pool)
367 {
368         int ret, ret1, ret2;
369
370         ret = pthread_mutex_lock(&pthreadpools_mutex);
371         if (ret != 0) {
372                 return ret;
373         }
374         DLIST_REMOVE(pthreadpools, pool);
375         ret = pthread_mutex_unlock(&pthreadpools_mutex);
376         assert(ret == 0);
377
378         ret = pthread_mutex_lock(&pool->mutex);
379         assert(ret == 0);
380         ret = pthread_mutex_unlock(&pool->mutex);
381         assert(ret == 0);
382
383         ret = pthread_mutex_destroy(&pool->mutex);
384         ret1 = pthread_cond_destroy(&pool->condvar);
385         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
386
387         if (ret != 0) {
388                 return ret;
389         }
390         if (ret1 != 0) {
391                 return ret1;
392         }
393         if (ret2 != 0) {
394                 return ret2;
395         }
396
397         free(pool->jobs);
398         free(pool);
399
400         return 0;
401 }
402
403 /*
404  * Stop a thread pool. Wake up all idle threads for exit.
405  */
406
407 static int pthreadpool_stop_locked(struct pthreadpool *pool)
408 {
409         int ret;
410
411         pool->stopped = true;
412
413         if (pool->num_threads == 0) {
414                 return 0;
415         }
416
417         /*
418          * We have active threads, tell them to finish.
419          */
420
421         ret = pthread_cond_broadcast(&pool->condvar);
422
423         return ret;
424 }
425
426 /*
427  * Stop a thread pool. Wake up all idle threads for exit.
428  */
429
430 int pthreadpool_stop(struct pthreadpool *pool)
431 {
432         int ret, ret1;
433
434         ret = pthread_mutex_lock(&pool->mutex);
435         if (ret != 0) {
436                 return ret;
437         }
438
439         if (!pool->stopped) {
440                 ret = pthreadpool_stop_locked(pool);
441         }
442
443         ret1 = pthread_mutex_unlock(&pool->mutex);
444         assert(ret1 == 0);
445
446         return ret;
447 }
448
449 /*
450  * Destroy a thread pool. Wake up all idle threads for exit. The last
451  * one will free the pool.
452  */
453
454 int pthreadpool_destroy(struct pthreadpool *pool)
455 {
456         int ret, ret1;
457         bool free_it;
458
459         assert(!pool->destroyed);
460
461         ret = pthread_mutex_lock(&pool->mutex);
462         if (ret != 0) {
463                 return ret;
464         }
465
466         pool->destroyed = true;
467
468         if (!pool->stopped) {
469                 ret = pthreadpool_stop_locked(pool);
470         }
471
472         free_it = (pool->num_threads == 0);
473
474         ret1 = pthread_mutex_unlock(&pool->mutex);
475         assert(ret1 == 0);
476
477         if (free_it) {
478                 pthreadpool_free(pool);
479         }
480
481         return ret;
482 }
483 /*
484  * Prepare for pthread_exit(), pool->mutex must be locked and will be
485  * unlocked here. This is a bit of a layering violation, but here we
486  * also take care of removing the pool if we're the last thread.
487  */
488 static void pthreadpool_server_exit(struct pthreadpool *pool)
489 {
490         int ret;
491         bool free_it;
492
493         pool->num_threads -= 1;
494
495         free_it = (pool->destroyed && (pool->num_threads == 0));
496
497         ret = pthread_mutex_unlock(&pool->mutex);
498         assert(ret == 0);
499
500         if (free_it) {
501                 pthreadpool_free(pool);
502         }
503 }
504
505 static bool pthreadpool_get_job(struct pthreadpool *p,
506                                 struct pthreadpool_job *job)
507 {
508         if (p->stopped) {
509                 return false;
510         }
511
512         if (p->num_jobs == 0) {
513                 return false;
514         }
515         *job = p->jobs[p->head];
516         p->head = (p->head+1) % p->jobs_array_len;
517         p->num_jobs -= 1;
518         return true;
519 }
520
521 static bool pthreadpool_put_job(struct pthreadpool *p,
522                                 int id,
523                                 void (*fn)(void *private_data),
524                                 void *private_data)
525 {
526         struct pthreadpool_job *job;
527
528         if (p->num_jobs == p->jobs_array_len) {
529                 struct pthreadpool_job *tmp;
530                 size_t new_len = p->jobs_array_len * 2;
531
532                 tmp = realloc(
533                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
534                 if (tmp == NULL) {
535                         return false;
536                 }
537                 p->jobs = tmp;
538
539                 /*
540                  * We just doubled the jobs array. The array implements a FIFO
541                  * queue with a modulo-based wraparound, so we have to memcpy
542                  * the jobs that are logically at the queue end but physically
543                  * before the queue head into the reallocated area. The new
544                  * space starts at the current jobs_array_len, and we have to
545                  * copy everything before the current head job into the new
546                  * area.
547                  */
548                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
549                        sizeof(struct pthreadpool_job) * p->head);
550
551                 p->jobs_array_len = new_len;
552         }
553
554         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
555         job->id = id;
556         job->fn = fn;
557         job->private_data = private_data;
558
559         p->num_jobs += 1;
560
561         return true;
562 }
563
564 static void pthreadpool_undo_put_job(struct pthreadpool *p)
565 {
566         p->num_jobs -= 1;
567 }
568
569 static void *pthreadpool_server(void *arg)
570 {
571         struct pthreadpool *pool = (struct pthreadpool *)arg;
572         int res;
573
574         res = pthread_mutex_lock(&pool->mutex);
575         if (res != 0) {
576                 return NULL;
577         }
578
579         while (1) {
580                 struct timespec ts;
581                 struct pthreadpool_job job;
582
583                 /*
584                  * idle-wait at most 1 second. If nothing happens in that
585                  * time, exit this thread.
586                  */
587
588                 clock_gettime(CLOCK_REALTIME, &ts);
589                 ts.tv_sec += 1;
590
591                 while ((pool->num_jobs == 0) && !pool->stopped) {
592
593                         pool->num_idle += 1;
594                         res = pthread_cond_timedwait(
595                                 &pool->condvar, &pool->mutex, &ts);
596                         pool->num_idle -= 1;
597
598                         if (pool->prefork_cond != NULL) {
599                                 /*
600                                  * Me must allow fork() to continue
601                                  * without anybody waiting on
602                                  * &pool->condvar. Tell
603                                  * pthreadpool_prepare_pool that we
604                                  * got that message.
605                                  */
606
607                                 res = pthread_cond_signal(pool->prefork_cond);
608                                 assert(res == 0);
609
610                                 res = pthread_mutex_unlock(&pool->mutex);
611                                 assert(res == 0);
612
613                                 /*
614                                  * pthreadpool_prepare_pool has
615                                  * already locked this mutex across
616                                  * the fork. This makes us wait
617                                  * without sitting in a condvar.
618                                  */
619                                 res = pthread_mutex_lock(&pool->fork_mutex);
620                                 assert(res == 0);
621                                 res = pthread_mutex_unlock(&pool->fork_mutex);
622                                 assert(res == 0);
623
624                                 res = pthread_mutex_lock(&pool->mutex);
625                                 assert(res == 0);
626                         }
627
628                         if (res == ETIMEDOUT) {
629
630                                 if (pool->num_jobs == 0) {
631                                         /*
632                                          * we timed out and still no work for
633                                          * us. Exit.
634                                          */
635                                         pthreadpool_server_exit(pool);
636                                         return NULL;
637                                 }
638
639                                 break;
640                         }
641                         assert(res == 0);
642                 }
643
644                 if (pthreadpool_get_job(pool, &job)) {
645                         int ret;
646
647                         /*
648                          * Do the work with the mutex unlocked
649                          */
650
651                         res = pthread_mutex_unlock(&pool->mutex);
652                         assert(res == 0);
653
654                         job.fn(job.private_data);
655
656                         ret = pool->signal_fn(job.id,
657                                               job.fn, job.private_data,
658                                               pool->signal_fn_private_data);
659
660                         res = pthread_mutex_lock(&pool->mutex);
661                         assert(res == 0);
662
663                         if (ret != 0) {
664                                 pthreadpool_server_exit(pool);
665                                 return NULL;
666                         }
667                 }
668
669                 if (pool->stopped) {
670                         /*
671                          * we're asked to stop processing jobs, so exit
672                          */
673                         pthreadpool_server_exit(pool);
674                         return NULL;
675                 }
676         }
677 }
678
679 static int pthreadpool_create_thread(struct pthreadpool *pool)
680 {
681         pthread_attr_t thread_attr;
682         pthread_t thread_id;
683         int res;
684         sigset_t mask, omask;
685
686         /*
687          * Create a new worker thread. It should not receive any signals.
688          */
689
690         sigfillset(&mask);
691
692         res = pthread_attr_init(&thread_attr);
693         if (res != 0) {
694                 return res;
695         }
696
697         res = pthread_attr_setdetachstate(
698                 &thread_attr, PTHREAD_CREATE_DETACHED);
699         if (res != 0) {
700                 pthread_attr_destroy(&thread_attr);
701                 return res;
702         }
703
704         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
705         if (res != 0) {
706                 pthread_attr_destroy(&thread_attr);
707                 return res;
708         }
709
710         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
711                              (void *)pool);
712
713         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
714
715         pthread_attr_destroy(&thread_attr);
716
717         if (res == 0) {
718                 pool->num_threads += 1;
719         }
720
721         return res;
722 }
723
724 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
725                         void (*fn)(void *private_data), void *private_data)
726 {
727         int res;
728         int unlock_res;
729
730         assert(!pool->destroyed);
731
732         res = pthread_mutex_lock(&pool->mutex);
733         if (res != 0) {
734                 return res;
735         }
736
737         if (pool->stopped) {
738                 /*
739                  * Protect against the pool being shut down while
740                  * trying to add a job
741                  */
742                 unlock_res = pthread_mutex_unlock(&pool->mutex);
743                 assert(unlock_res == 0);
744                 return EINVAL;
745         }
746
747         if (pool->max_threads == 0) {
748                 unlock_res = pthread_mutex_unlock(&pool->mutex);
749                 assert(unlock_res == 0);
750
751                 /*
752                  * If no thread are allowed we do strict sync processing.
753                  */
754                 fn(private_data);
755                 res = pool->signal_fn(job_id, fn, private_data,
756                                       pool->signal_fn_private_data);
757                 return res;
758         }
759
760         /*
761          * Add job to the end of the queue
762          */
763         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
764                 unlock_res = pthread_mutex_unlock(&pool->mutex);
765                 assert(unlock_res == 0);
766                 return ENOMEM;
767         }
768
769         if (pool->num_idle > 0) {
770                 /*
771                  * We have idle threads, wake one.
772                  */
773                 res = pthread_cond_signal(&pool->condvar);
774                 if (res != 0) {
775                         pthreadpool_undo_put_job(pool);
776                 }
777                 unlock_res = pthread_mutex_unlock(&pool->mutex);
778                 assert(unlock_res == 0);
779                 return res;
780         }
781
782         if (pool->num_threads >= pool->max_threads) {
783                 /*
784                  * No more new threads, we just queue the request
785                  */
786                 unlock_res = pthread_mutex_unlock(&pool->mutex);
787                 assert(unlock_res == 0);
788                 return 0;
789         }
790
791         res = pthreadpool_create_thread(pool);
792         if (res == 0) {
793                 unlock_res = pthread_mutex_unlock(&pool->mutex);
794                 assert(unlock_res == 0);
795                 return 0;
796         }
797
798         if (pool->num_threads != 0) {
799                 /*
800                  * At least one thread is still available, let
801                  * that one run the queued job.
802                  */
803                 unlock_res = pthread_mutex_unlock(&pool->mutex);
804                 assert(unlock_res == 0);
805                 return 0;
806         }
807
808         /*
809          * No thread could be created to run job, fallback to sync
810          * call.
811          */
812         pthreadpool_undo_put_job(pool);
813
814         unlock_res = pthread_mutex_unlock(&pool->mutex);
815         assert(unlock_res == 0);
816
817         return res;
818 }
819
820 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
821                               void (*fn)(void *private_data), void *private_data)
822 {
823         int res;
824         size_t i, j;
825         size_t num = 0;
826
827         assert(!pool->destroyed);
828
829         res = pthread_mutex_lock(&pool->mutex);
830         if (res != 0) {
831                 return res;
832         }
833
834         for (i = 0, j = 0; i < pool->num_jobs; i++) {
835                 size_t idx = (pool->head + i) % pool->jobs_array_len;
836                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
837                 struct pthreadpool_job *job = &pool->jobs[idx];
838
839                 if ((job->private_data == private_data) &&
840                     (job->id == job_id) &&
841                     (job->fn == fn))
842                 {
843                         /*
844                          * Just skip the entry.
845                          */
846                         num++;
847                         continue;
848                 }
849
850                 /*
851                  * If we already removed one or more jobs (so j will be smaller
852                  * then i), we need to fill possible gaps in the logical list.
853                  */
854                 if (j < i) {
855                         pool->jobs[new_idx] = *job;
856                 }
857                 j++;
858         }
859
860         pool->num_jobs -= num;
861
862         res = pthread_mutex_unlock(&pool->mutex);
863         assert(res == 0);
864
865         return num;
866 }