2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job {
35 void (*fn)(void *private_data);
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
71 void *signal_fn_private_data;
74 * indicator to worker threads to stop processing further jobs
80 * indicator to the last worker thread to free the pool
86 * maximum number of threads
87 * 0 means no real thread, only strict sync processing.
97 * Number of idle threads
102 * Condition variable indicating that helper threads should
103 * quickly go away making way for fork() without anybody
104 * waiting on pool->condvar.
106 pthread_cond_t *prefork_cond;
109 * Waiting position for helper threads while fork is
110 * running. The forking thread will have locked it, and all
111 * idle helper threads will sit here until after the fork,
112 * where the forking thread will unlock it again.
114 pthread_mutex_t fork_mutex;
117 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
118 static struct pthreadpool *pthreadpools = NULL;
119 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
121 static void pthreadpool_prep_atfork(void);
124 * Initialize a thread pool
127 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
128 int (*signal_fn)(int jobid,
129 void (*job_fn)(void *private_data),
130 void *job_fn_private_data,
132 void *signal_fn_private_data)
134 struct pthreadpool *pool;
137 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
141 pool->signal_fn = signal_fn;
142 pool->signal_fn_private_data = signal_fn_private_data;
144 pool->jobs_array_len = 4;
146 pool->jobs_array_len, sizeof(struct pthreadpool_job));
148 if (pool->jobs == NULL) {
153 pool->head = pool->num_jobs = 0;
155 ret = pthread_mutex_init(&pool->mutex, NULL);
162 ret = pthread_cond_init(&pool->condvar, NULL);
164 pthread_mutex_destroy(&pool->mutex);
170 ret = pthread_mutex_init(&pool->fork_mutex, NULL);
172 pthread_cond_destroy(&pool->condvar);
173 pthread_mutex_destroy(&pool->mutex);
179 pool->stopped = false;
180 pool->destroyed = false;
181 pool->num_threads = 0;
182 pool->max_threads = max_threads;
184 pool->prefork_cond = NULL;
186 ret = pthread_mutex_lock(&pthreadpools_mutex);
188 pthread_mutex_destroy(&pool->fork_mutex);
189 pthread_cond_destroy(&pool->condvar);
190 pthread_mutex_destroy(&pool->mutex);
195 DLIST_ADD(pthreadpools, pool);
197 ret = pthread_mutex_unlock(&pthreadpools_mutex);
200 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
207 size_t pthreadpool_max_threads(struct pthreadpool *pool)
213 return pool->max_threads;
216 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
226 res = pthread_mutex_lock(&pool->mutex);
232 unlock_res = pthread_mutex_unlock(&pool->mutex);
233 assert(unlock_res == 0);
237 ret = pool->num_jobs;
239 unlock_res = pthread_mutex_unlock(&pool->mutex);
240 assert(unlock_res == 0);
244 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
248 ret = pthread_mutex_lock(&pool->fork_mutex);
251 ret = pthread_mutex_lock(&pool->mutex);
254 while (pool->num_idle != 0) {
255 unsigned num_idle = pool->num_idle;
256 pthread_cond_t prefork_cond;
258 ret = pthread_cond_init(&prefork_cond, NULL);
262 * Push all idle threads off pool->condvar. In the
263 * child we can destroy the pool, which would result
264 * in undefined behaviour in the
265 * pthread_cond_destroy(pool->condvar). glibc just
268 pool->prefork_cond = &prefork_cond;
270 ret = pthread_cond_signal(&pool->condvar);
273 while (pool->num_idle == num_idle) {
274 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
278 pool->prefork_cond = NULL;
280 ret = pthread_cond_destroy(&prefork_cond);
285 * Probably it's well-defined somewhere: What happens to
286 * condvars after a fork? The rationale of pthread_atfork only
287 * writes about mutexes. So better be safe than sorry and
288 * destroy/reinit pool->condvar across a fork.
291 ret = pthread_cond_destroy(&pool->condvar);
295 static void pthreadpool_prepare(void)
298 struct pthreadpool *pool;
300 ret = pthread_mutex_lock(&pthreadpools_mutex);
305 while (pool != NULL) {
306 pthreadpool_prepare_pool(pool);
311 static void pthreadpool_parent(void)
314 struct pthreadpool *pool;
316 for (pool = DLIST_TAIL(pthreadpools);
318 pool = DLIST_PREV(pool)) {
319 ret = pthread_cond_init(&pool->condvar, NULL);
321 ret = pthread_mutex_unlock(&pool->mutex);
323 ret = pthread_mutex_unlock(&pool->fork_mutex);
327 ret = pthread_mutex_unlock(&pthreadpools_mutex);
331 static void pthreadpool_child(void)
334 struct pthreadpool *pool;
336 for (pool = DLIST_TAIL(pthreadpools);
338 pool = DLIST_PREV(pool)) {
340 pool->num_threads = 0;
345 ret = pthread_cond_init(&pool->condvar, NULL);
348 ret = pthread_mutex_unlock(&pool->mutex);
351 ret = pthread_mutex_unlock(&pool->fork_mutex);
355 ret = pthread_mutex_unlock(&pthreadpools_mutex);
359 static void pthreadpool_prep_atfork(void)
361 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
365 static int pthreadpool_free(struct pthreadpool *pool)
369 ret = pthread_mutex_lock(&pthreadpools_mutex);
373 DLIST_REMOVE(pthreadpools, pool);
374 ret = pthread_mutex_unlock(&pthreadpools_mutex);
377 ret = pthread_mutex_lock(&pool->mutex);
379 ret = pthread_mutex_unlock(&pool->mutex);
382 ret = pthread_mutex_destroy(&pool->mutex);
383 ret1 = pthread_cond_destroy(&pool->condvar);
384 ret2 = pthread_mutex_destroy(&pool->fork_mutex);
403 * Stop a thread pool. Wake up all idle threads for exit.
406 static int pthreadpool_stop_locked(struct pthreadpool *pool)
410 pool->stopped = true;
412 if (pool->num_threads == 0) {
417 * We have active threads, tell them to finish.
420 ret = pthread_cond_broadcast(&pool->condvar);
426 * Stop a thread pool. Wake up all idle threads for exit.
429 int pthreadpool_stop(struct pthreadpool *pool)
433 ret = pthread_mutex_lock(&pool->mutex);
438 if (!pool->stopped) {
439 ret = pthreadpool_stop_locked(pool);
442 ret1 = pthread_mutex_unlock(&pool->mutex);
449 * Destroy a thread pool. Wake up all idle threads for exit. The last
450 * one will free the pool.
453 int pthreadpool_destroy(struct pthreadpool *pool)
458 assert(!pool->destroyed);
460 ret = pthread_mutex_lock(&pool->mutex);
465 pool->destroyed = true;
467 if (!pool->stopped) {
468 ret = pthreadpool_stop_locked(pool);
471 free_it = (pool->num_threads == 0);
473 ret1 = pthread_mutex_unlock(&pool->mutex);
477 pthreadpool_free(pool);
483 * Prepare for pthread_exit(), pool->mutex must be locked and will be
484 * unlocked here. This is a bit of a layering violation, but here we
485 * also take care of removing the pool if we're the last thread.
487 static void pthreadpool_server_exit(struct pthreadpool *pool)
492 pool->num_threads -= 1;
494 free_it = (pool->destroyed && (pool->num_threads == 0));
496 ret = pthread_mutex_unlock(&pool->mutex);
500 pthreadpool_free(pool);
504 static bool pthreadpool_get_job(struct pthreadpool *p,
505 struct pthreadpool_job *job)
511 if (p->num_jobs == 0) {
514 *job = p->jobs[p->head];
515 p->head = (p->head+1) % p->jobs_array_len;
520 static bool pthreadpool_put_job(struct pthreadpool *p,
522 void (*fn)(void *private_data),
525 struct pthreadpool_job *job;
527 if (p->num_jobs == p->jobs_array_len) {
528 struct pthreadpool_job *tmp;
529 size_t new_len = p->jobs_array_len * 2;
532 p->jobs, sizeof(struct pthreadpool_job) * new_len);
539 * We just doubled the jobs array. The array implements a FIFO
540 * queue with a modulo-based wraparound, so we have to memcpy
541 * the jobs that are logically at the queue end but physically
542 * before the queue head into the reallocated area. The new
543 * space starts at the current jobs_array_len, and we have to
544 * copy everything before the current head job into the new
547 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
548 sizeof(struct pthreadpool_job) * p->head);
550 p->jobs_array_len = new_len;
553 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
556 job->private_data = private_data;
563 static void pthreadpool_undo_put_job(struct pthreadpool *p)
568 static void *pthreadpool_server(void *arg)
570 struct pthreadpool *pool = (struct pthreadpool *)arg;
573 res = pthread_mutex_lock(&pool->mutex);
580 struct pthreadpool_job job;
583 * idle-wait at most 1 second. If nothing happens in that
584 * time, exit this thread.
587 clock_gettime(CLOCK_REALTIME, &ts);
590 while ((pool->num_jobs == 0) && !pool->stopped) {
593 res = pthread_cond_timedwait(
594 &pool->condvar, &pool->mutex, &ts);
597 if (pool->prefork_cond != NULL) {
599 * Me must allow fork() to continue
600 * without anybody waiting on
601 * &pool->condvar. Tell
602 * pthreadpool_prepare_pool that we
606 res = pthread_cond_signal(pool->prefork_cond);
609 res = pthread_mutex_unlock(&pool->mutex);
613 * pthreadpool_prepare_pool has
614 * already locked this mutex across
615 * the fork. This makes us wait
616 * without sitting in a condvar.
618 res = pthread_mutex_lock(&pool->fork_mutex);
620 res = pthread_mutex_unlock(&pool->fork_mutex);
623 res = pthread_mutex_lock(&pool->mutex);
627 if (res == ETIMEDOUT) {
629 if (pool->num_jobs == 0) {
631 * we timed out and still no work for
634 pthreadpool_server_exit(pool);
643 if (pthreadpool_get_job(pool, &job)) {
647 * Do the work with the mutex unlocked
650 res = pthread_mutex_unlock(&pool->mutex);
653 job.fn(job.private_data);
655 ret = pool->signal_fn(job.id,
656 job.fn, job.private_data,
657 pool->signal_fn_private_data);
659 res = pthread_mutex_lock(&pool->mutex);
663 pthreadpool_server_exit(pool);
670 * we're asked to stop processing jobs, so exit
672 pthreadpool_server_exit(pool);
678 static int pthreadpool_create_thread(struct pthreadpool *pool)
680 pthread_attr_t thread_attr;
683 sigset_t mask, omask;
686 * Create a new worker thread. It should not receive any signals.
691 res = pthread_attr_init(&thread_attr);
696 res = pthread_attr_setdetachstate(
697 &thread_attr, PTHREAD_CREATE_DETACHED);
699 pthread_attr_destroy(&thread_attr);
703 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
705 pthread_attr_destroy(&thread_attr);
709 res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
712 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
714 pthread_attr_destroy(&thread_attr);
717 pool->num_threads += 1;
723 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
724 void (*fn)(void *private_data), void *private_data)
729 assert(!pool->destroyed);
731 res = pthread_mutex_lock(&pool->mutex);
738 * Protect against the pool being shut down while
739 * trying to add a job
741 unlock_res = pthread_mutex_unlock(&pool->mutex);
742 assert(unlock_res == 0);
746 if (pool->max_threads == 0) {
747 unlock_res = pthread_mutex_unlock(&pool->mutex);
748 assert(unlock_res == 0);
751 * If no thread are allowed we do strict sync processing.
754 res = pool->signal_fn(job_id, fn, private_data,
755 pool->signal_fn_private_data);
760 * Add job to the end of the queue
762 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
763 unlock_res = pthread_mutex_unlock(&pool->mutex);
764 assert(unlock_res == 0);
768 if (pool->num_idle > 0) {
770 * We have idle threads, wake one.
772 res = pthread_cond_signal(&pool->condvar);
774 pthreadpool_undo_put_job(pool);
776 unlock_res = pthread_mutex_unlock(&pool->mutex);
777 assert(unlock_res == 0);
781 if (pool->num_threads >= pool->max_threads) {
783 * No more new threads, we just queue the request
785 unlock_res = pthread_mutex_unlock(&pool->mutex);
786 assert(unlock_res == 0);
790 res = pthreadpool_create_thread(pool);
792 unlock_res = pthread_mutex_unlock(&pool->mutex);
793 assert(unlock_res == 0);
797 if (pool->num_threads != 0) {
799 * At least one thread is still available, let
800 * that one run the queued job.
802 unlock_res = pthread_mutex_unlock(&pool->mutex);
803 assert(unlock_res == 0);
808 * No thread could be created to run job, fallback to sync
811 pthreadpool_undo_put_job(pool);
813 unlock_res = pthread_mutex_unlock(&pool->mutex);
814 assert(unlock_res == 0);
819 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
820 void (*fn)(void *private_data), void *private_data)
826 assert(!pool->destroyed);
828 res = pthread_mutex_lock(&pool->mutex);
833 for (i = 0, j = 0; i < pool->num_jobs; i++) {
834 size_t idx = (pool->head + i) % pool->jobs_array_len;
835 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
836 struct pthreadpool_job *job = &pool->jobs[idx];
838 if ((job->private_data == private_data) &&
839 (job->id == job_id) &&
843 * Just skip the entry.
850 * If we already removed one or more jobs (so j will be smaller
851 * then i), we need to fill possible gaps in the logical list.
854 pool->jobs[new_idx] = *job;
859 pool->num_jobs -= num;
861 res = pthread_mutex_unlock(&pool->mutex);