2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job {
35 void (*fn)(void *private_data);
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
71 void *signal_fn_private_data;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
80 * 0 means no real thread, only strict sync processing.
90 * Number of idle threads
95 * Condition variable indicating that helper threads should
96 * quickly go away making way for fork() without anybody
97 * waiting on pool->condvar.
99 pthread_cond_t *prefork_cond;
102 * Waiting position for helper threads while fork is
103 * running. The forking thread will have locked it, and all
104 * idle helper threads will sit here until after the fork,
105 * where the forking thread will unlock it again.
107 pthread_mutex_t fork_mutex;
110 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
111 static struct pthreadpool *pthreadpools = NULL;
112 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
114 static void pthreadpool_prep_atfork(void);
117 * Initialize a thread pool
120 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
121 int (*signal_fn)(int jobid,
122 void (*job_fn)(void *private_data),
123 void *job_fn_private_data,
125 void *signal_fn_private_data)
127 struct pthreadpool *pool;
130 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
134 pool->signal_fn = signal_fn;
135 pool->signal_fn_private_data = signal_fn_private_data;
137 pool->jobs_array_len = 4;
139 pool->jobs_array_len, sizeof(struct pthreadpool_job));
141 if (pool->jobs == NULL) {
146 pool->head = pool->num_jobs = 0;
148 ret = pthread_mutex_init(&pool->mutex, NULL);
155 ret = pthread_cond_init(&pool->condvar, NULL);
157 pthread_mutex_destroy(&pool->mutex);
163 ret = pthread_mutex_init(&pool->fork_mutex, NULL);
165 pthread_cond_destroy(&pool->condvar);
166 pthread_mutex_destroy(&pool->mutex);
172 pool->shutdown = false;
173 pool->num_threads = 0;
174 pool->max_threads = max_threads;
176 pool->prefork_cond = NULL;
178 ret = pthread_mutex_lock(&pthreadpools_mutex);
180 pthread_mutex_destroy(&pool->fork_mutex);
181 pthread_cond_destroy(&pool->condvar);
182 pthread_mutex_destroy(&pool->mutex);
187 DLIST_ADD(pthreadpools, pool);
189 ret = pthread_mutex_unlock(&pthreadpools_mutex);
192 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
199 size_t pthreadpool_max_threads(struct pthreadpool *pool)
201 return pool->max_threads;
204 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
210 res = pthread_mutex_lock(&pool->mutex);
215 ret = pool->num_jobs;
217 unlock_res = pthread_mutex_unlock(&pool->mutex);
218 assert(unlock_res == 0);
222 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
226 ret = pthread_mutex_lock(&pool->fork_mutex);
229 ret = pthread_mutex_lock(&pool->mutex);
232 while (pool->num_idle != 0) {
233 unsigned num_idle = pool->num_idle;
234 pthread_cond_t prefork_cond;
236 ret = pthread_cond_init(&prefork_cond, NULL);
240 * Push all idle threads off pool->condvar. In the
241 * child we can destroy the pool, which would result
242 * in undefined behaviour in the
243 * pthread_cond_destroy(pool->condvar). glibc just
246 pool->prefork_cond = &prefork_cond;
248 ret = pthread_cond_signal(&pool->condvar);
251 while (pool->num_idle == num_idle) {
252 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
256 pool->prefork_cond = NULL;
258 ret = pthread_cond_destroy(&prefork_cond);
263 * Probably it's well-defined somewhere: What happens to
264 * condvars after a fork? The rationale of pthread_atfork only
265 * writes about mutexes. So better be safe than sorry and
266 * destroy/reinit pool->condvar across a fork.
269 ret = pthread_cond_destroy(&pool->condvar);
273 static void pthreadpool_prepare(void)
276 struct pthreadpool *pool;
278 ret = pthread_mutex_lock(&pthreadpools_mutex);
283 while (pool != NULL) {
284 pthreadpool_prepare_pool(pool);
289 static void pthreadpool_parent(void)
292 struct pthreadpool *pool;
294 for (pool = DLIST_TAIL(pthreadpools);
296 pool = DLIST_PREV(pool)) {
297 ret = pthread_cond_init(&pool->condvar, NULL);
299 ret = pthread_mutex_unlock(&pool->mutex);
301 ret = pthread_mutex_unlock(&pool->fork_mutex);
305 ret = pthread_mutex_unlock(&pthreadpools_mutex);
309 static void pthreadpool_child(void)
312 struct pthreadpool *pool;
314 for (pool = DLIST_TAIL(pthreadpools);
316 pool = DLIST_PREV(pool)) {
318 pool->num_threads = 0;
323 ret = pthread_cond_init(&pool->condvar, NULL);
326 ret = pthread_mutex_unlock(&pool->mutex);
329 ret = pthread_mutex_unlock(&pool->fork_mutex);
333 ret = pthread_mutex_unlock(&pthreadpools_mutex);
337 static void pthreadpool_prep_atfork(void)
339 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
343 static int pthreadpool_free(struct pthreadpool *pool)
347 ret = pthread_mutex_lock(&pthreadpools_mutex);
351 DLIST_REMOVE(pthreadpools, pool);
352 ret = pthread_mutex_unlock(&pthreadpools_mutex);
355 ret = pthread_mutex_lock(&pool->mutex);
357 ret = pthread_mutex_unlock(&pool->mutex);
360 ret = pthread_mutex_destroy(&pool->mutex);
361 ret1 = pthread_cond_destroy(&pool->condvar);
362 ret2 = pthread_mutex_destroy(&pool->fork_mutex);
381 * Destroy a thread pool. Wake up all idle threads for exit. The last
382 * one will free the pool.
385 int pthreadpool_destroy(struct pthreadpool *pool)
389 ret = pthread_mutex_lock(&pool->mutex);
394 if (pool->shutdown) {
395 ret = pthread_mutex_unlock(&pool->mutex);
400 pool->shutdown = true;
402 if (pool->num_threads == 0) {
403 ret = pthread_mutex_unlock(&pool->mutex);
406 ret = pthreadpool_free(pool);
411 * We have active threads, tell them to finish.
414 ret = pthread_cond_broadcast(&pool->condvar);
416 ret1 = pthread_mutex_unlock(&pool->mutex);
423 * Prepare for pthread_exit(), pool->mutex must be locked and will be
424 * unlocked here. This is a bit of a layering violation, but here we
425 * also take care of removing the pool if we're the last thread.
427 static void pthreadpool_server_exit(struct pthreadpool *pool)
432 pool->num_threads -= 1;
434 free_it = (pool->shutdown && (pool->num_threads == 0));
436 ret = pthread_mutex_unlock(&pool->mutex);
440 pthreadpool_free(pool);
444 static bool pthreadpool_get_job(struct pthreadpool *p,
445 struct pthreadpool_job *job)
447 if (p->num_jobs == 0) {
450 *job = p->jobs[p->head];
451 p->head = (p->head+1) % p->jobs_array_len;
456 static bool pthreadpool_put_job(struct pthreadpool *p,
458 void (*fn)(void *private_data),
461 struct pthreadpool_job *job;
463 if (p->num_jobs == p->jobs_array_len) {
464 struct pthreadpool_job *tmp;
465 size_t new_len = p->jobs_array_len * 2;
468 p->jobs, sizeof(struct pthreadpool_job) * new_len);
475 * We just doubled the jobs array. The array implements a FIFO
476 * queue with a modulo-based wraparound, so we have to memcpy
477 * the jobs that are logically at the queue end but physically
478 * before the queue head into the reallocated area. The new
479 * space starts at the current jobs_array_len, and we have to
480 * copy everything before the current head job into the new
483 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
484 sizeof(struct pthreadpool_job) * p->head);
486 p->jobs_array_len = new_len;
489 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
492 job->private_data = private_data;
499 static void pthreadpool_undo_put_job(struct pthreadpool *p)
504 static void *pthreadpool_server(void *arg)
506 struct pthreadpool *pool = (struct pthreadpool *)arg;
509 res = pthread_mutex_lock(&pool->mutex);
516 struct pthreadpool_job job;
519 * idle-wait at most 1 second. If nothing happens in that
520 * time, exit this thread.
523 clock_gettime(CLOCK_REALTIME, &ts);
526 while ((pool->num_jobs == 0) && !pool->shutdown) {
529 res = pthread_cond_timedwait(
530 &pool->condvar, &pool->mutex, &ts);
533 if (pool->prefork_cond != NULL) {
535 * Me must allow fork() to continue
536 * without anybody waiting on
537 * &pool->condvar. Tell
538 * pthreadpool_prepare_pool that we
542 res = pthread_cond_signal(pool->prefork_cond);
545 res = pthread_mutex_unlock(&pool->mutex);
549 * pthreadpool_prepare_pool has
550 * already locked this mutex across
551 * the fork. This makes us wait
552 * without sitting in a condvar.
554 res = pthread_mutex_lock(&pool->fork_mutex);
556 res = pthread_mutex_unlock(&pool->fork_mutex);
559 res = pthread_mutex_lock(&pool->mutex);
563 if (res == ETIMEDOUT) {
565 if (pool->num_jobs == 0) {
567 * we timed out and still no work for
570 pthreadpool_server_exit(pool);
579 if (pthreadpool_get_job(pool, &job)) {
583 * Do the work with the mutex unlocked
586 res = pthread_mutex_unlock(&pool->mutex);
589 job.fn(job.private_data);
591 ret = pool->signal_fn(job.id,
592 job.fn, job.private_data,
593 pool->signal_fn_private_data);
595 res = pthread_mutex_lock(&pool->mutex);
599 pthreadpool_server_exit(pool);
604 if ((pool->num_jobs == 0) && pool->shutdown) {
606 * No more work to do and we're asked to shut down, so
609 pthreadpool_server_exit(pool);
615 static int pthreadpool_create_thread(struct pthreadpool *pool)
617 pthread_attr_t thread_attr;
620 sigset_t mask, omask;
623 * Create a new worker thread. It should not receive any signals.
628 res = pthread_attr_init(&thread_attr);
633 res = pthread_attr_setdetachstate(
634 &thread_attr, PTHREAD_CREATE_DETACHED);
636 pthread_attr_destroy(&thread_attr);
640 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
642 pthread_attr_destroy(&thread_attr);
646 res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
649 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
651 pthread_attr_destroy(&thread_attr);
654 pool->num_threads += 1;
660 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
661 void (*fn)(void *private_data), void *private_data)
666 res = pthread_mutex_lock(&pool->mutex);
671 if (pool->shutdown) {
673 * Protect against the pool being shut down while
674 * trying to add a job
676 unlock_res = pthread_mutex_unlock(&pool->mutex);
677 assert(unlock_res == 0);
681 if (pool->max_threads == 0) {
682 unlock_res = pthread_mutex_unlock(&pool->mutex);
683 assert(unlock_res == 0);
686 * If no thread are allowed we do strict sync processing.
689 res = pool->signal_fn(job_id, fn, private_data,
690 pool->signal_fn_private_data);
695 * Add job to the end of the queue
697 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
698 unlock_res = pthread_mutex_unlock(&pool->mutex);
699 assert(unlock_res == 0);
703 if (pool->num_idle > 0) {
705 * We have idle threads, wake one.
707 res = pthread_cond_signal(&pool->condvar);
709 pthreadpool_undo_put_job(pool);
711 unlock_res = pthread_mutex_unlock(&pool->mutex);
712 assert(unlock_res == 0);
716 if (pool->num_threads >= pool->max_threads) {
718 * No more new threads, we just queue the request
720 unlock_res = pthread_mutex_unlock(&pool->mutex);
721 assert(unlock_res == 0);
725 res = pthreadpool_create_thread(pool);
727 unlock_res = pthread_mutex_unlock(&pool->mutex);
728 assert(unlock_res == 0);
732 if (pool->num_threads != 0) {
734 * At least one thread is still available, let
735 * that one run the queued job.
737 unlock_res = pthread_mutex_unlock(&pool->mutex);
738 assert(unlock_res == 0);
743 * No thread could be created to run job, fallback to sync
746 pthreadpool_undo_put_job(pool);
748 unlock_res = pthread_mutex_unlock(&pool->mutex);
749 assert(unlock_res == 0);
754 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
755 void (*fn)(void *private_data), void *private_data)
761 res = pthread_mutex_lock(&pool->mutex);
766 for (i = 0, j = 0; i < pool->num_jobs; i++) {
767 size_t idx = (pool->head + i) % pool->jobs_array_len;
768 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
769 struct pthreadpool_job *job = &pool->jobs[idx];
771 if ((job->private_data == private_data) &&
772 (job->id == job_id) &&
776 * Just skip the entry.
783 * If we already removed one or more jobs (so j will be smaller
784 * then i), we need to fill possible gaps in the logical list.
787 pool->jobs[new_idx] = *job;
792 pool->num_jobs -= num;
794 res = pthread_mutex_unlock(&pool->mutex);