2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job {
35 void (*fn)(void *private_data);
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
71 void *signal_fn_private_data;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
89 * Number of idle threads
94 * Condition variable indicating that helper threads should
95 * quickly go away making way for fork() without anybody
96 * waiting on pool->condvar.
98 pthread_cond_t *prefork_cond;
101 * Waiting position for helper threads while fork is
102 * running. The forking thread will have locked it, and all
103 * idle helper threads will sit here until after the fork,
104 * where the forking thread will unlock it again.
106 pthread_mutex_t fork_mutex;
109 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
110 static struct pthreadpool *pthreadpools = NULL;
111 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
113 static void pthreadpool_prep_atfork(void);
116 * Initialize a thread pool
119 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
120 int (*signal_fn)(int jobid,
121 void (*job_fn)(void *private_data),
122 void *job_fn_private_data,
124 void *signal_fn_private_data)
126 struct pthreadpool *pool;
129 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
133 pool->signal_fn = signal_fn;
134 pool->signal_fn_private_data = signal_fn_private_data;
136 pool->jobs_array_len = 4;
138 pool->jobs_array_len, sizeof(struct pthreadpool_job));
140 if (pool->jobs == NULL) {
145 pool->head = pool->num_jobs = 0;
147 ret = pthread_mutex_init(&pool->mutex, NULL);
154 ret = pthread_cond_init(&pool->condvar, NULL);
156 pthread_mutex_destroy(&pool->mutex);
162 ret = pthread_mutex_init(&pool->fork_mutex, NULL);
164 pthread_cond_destroy(&pool->condvar);
165 pthread_mutex_destroy(&pool->mutex);
171 pool->shutdown = false;
172 pool->num_threads = 0;
173 pool->max_threads = max_threads;
175 pool->prefork_cond = NULL;
177 ret = pthread_mutex_lock(&pthreadpools_mutex);
179 pthread_mutex_destroy(&pool->fork_mutex);
180 pthread_cond_destroy(&pool->condvar);
181 pthread_mutex_destroy(&pool->mutex);
186 DLIST_ADD(pthreadpools, pool);
188 ret = pthread_mutex_unlock(&pthreadpools_mutex);
191 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
198 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
202 ret = pthread_mutex_lock(&pool->fork_mutex);
205 ret = pthread_mutex_lock(&pool->mutex);
208 while (pool->num_idle != 0) {
209 unsigned num_idle = pool->num_idle;
210 pthread_cond_t prefork_cond;
212 ret = pthread_cond_init(&prefork_cond, NULL);
216 * Push all idle threads off pool->condvar. In the
217 * child we can destroy the pool, which would result
218 * in undefined behaviour in the
219 * pthread_cond_destroy(pool->condvar). glibc just
222 pool->prefork_cond = &prefork_cond;
224 ret = pthread_cond_signal(&pool->condvar);
227 while (pool->num_idle == num_idle) {
228 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
232 pool->prefork_cond = NULL;
234 ret = pthread_cond_destroy(&prefork_cond);
239 * Probably it's well-defined somewhere: What happens to
240 * condvars after a fork? The rationale of pthread_atfork only
241 * writes about mutexes. So better be safe than sorry and
242 * destroy/reinit pool->condvar across a fork.
245 ret = pthread_cond_destroy(&pool->condvar);
249 static void pthreadpool_prepare(void)
252 struct pthreadpool *pool;
254 ret = pthread_mutex_lock(&pthreadpools_mutex);
259 while (pool != NULL) {
260 pthreadpool_prepare_pool(pool);
265 static void pthreadpool_parent(void)
268 struct pthreadpool *pool;
270 for (pool = DLIST_TAIL(pthreadpools);
272 pool = DLIST_PREV(pool)) {
273 ret = pthread_cond_init(&pool->condvar, NULL);
275 ret = pthread_mutex_unlock(&pool->mutex);
277 ret = pthread_mutex_unlock(&pool->fork_mutex);
281 ret = pthread_mutex_unlock(&pthreadpools_mutex);
285 static void pthreadpool_child(void)
288 struct pthreadpool *pool;
290 for (pool = DLIST_TAIL(pthreadpools);
292 pool = DLIST_PREV(pool)) {
294 pool->num_threads = 0;
299 ret = pthread_cond_init(&pool->condvar, NULL);
302 ret = pthread_mutex_unlock(&pool->mutex);
305 ret = pthread_mutex_unlock(&pool->fork_mutex);
309 ret = pthread_mutex_unlock(&pthreadpools_mutex);
313 static void pthreadpool_prep_atfork(void)
315 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
319 static int pthreadpool_free(struct pthreadpool *pool)
323 ret = pthread_mutex_lock(&pthreadpools_mutex);
327 DLIST_REMOVE(pthreadpools, pool);
328 ret = pthread_mutex_unlock(&pthreadpools_mutex);
331 ret = pthread_mutex_destroy(&pool->mutex);
332 ret1 = pthread_cond_destroy(&pool->condvar);
333 ret2 = pthread_mutex_destroy(&pool->fork_mutex);
352 * Destroy a thread pool. Wake up all idle threads for exit. The last
353 * one will free the pool.
356 int pthreadpool_destroy(struct pthreadpool *pool)
360 ret = pthread_mutex_lock(&pool->mutex);
365 if (pool->shutdown) {
366 ret = pthread_mutex_unlock(&pool->mutex);
371 pool->shutdown = true;
373 if (pool->num_threads == 0) {
374 ret = pthread_mutex_unlock(&pool->mutex);
377 ret = pthreadpool_free(pool);
382 * We have active threads, tell them to finish.
385 ret = pthread_cond_broadcast(&pool->condvar);
387 ret1 = pthread_mutex_unlock(&pool->mutex);
394 * Prepare for pthread_exit(), pool->mutex must be locked and will be
395 * unlocked here. This is a bit of a layering violation, but here we
396 * also take care of removing the pool if we're the last thread.
398 static void pthreadpool_server_exit(struct pthreadpool *pool)
403 pool->num_threads -= 1;
405 free_it = (pool->shutdown && (pool->num_threads == 0));
407 ret = pthread_mutex_unlock(&pool->mutex);
411 pthreadpool_free(pool);
415 static bool pthreadpool_get_job(struct pthreadpool *p,
416 struct pthreadpool_job *job)
418 if (p->num_jobs == 0) {
421 *job = p->jobs[p->head];
422 p->head = (p->head+1) % p->jobs_array_len;
427 static bool pthreadpool_put_job(struct pthreadpool *p,
429 void (*fn)(void *private_data),
432 struct pthreadpool_job *job;
434 if (p->num_jobs == p->jobs_array_len) {
435 struct pthreadpool_job *tmp;
436 size_t new_len = p->jobs_array_len * 2;
439 p->jobs, sizeof(struct pthreadpool_job) * new_len);
446 * We just doubled the jobs array. The array implements a FIFO
447 * queue with a modulo-based wraparound, so we have to memcpy
448 * the jobs that are logically at the queue end but physically
449 * before the queue head into the reallocated area. The new
450 * space starts at the current jobs_array_len, and we have to
451 * copy everything before the current head job into the new
454 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
455 sizeof(struct pthreadpool_job) * p->head);
457 p->jobs_array_len = new_len;
460 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
463 job->private_data = private_data;
470 static void pthreadpool_undo_put_job(struct pthreadpool *p)
475 static void *pthreadpool_server(void *arg)
477 struct pthreadpool *pool = (struct pthreadpool *)arg;
480 res = pthread_mutex_lock(&pool->mutex);
487 struct pthreadpool_job job;
490 * idle-wait at most 1 second. If nothing happens in that
491 * time, exit this thread.
494 clock_gettime(CLOCK_REALTIME, &ts);
497 while ((pool->num_jobs == 0) && !pool->shutdown) {
500 res = pthread_cond_timedwait(
501 &pool->condvar, &pool->mutex, &ts);
504 if (pool->prefork_cond != NULL) {
506 * Me must allow fork() to continue
507 * without anybody waiting on
508 * &pool->condvar. Tell
509 * pthreadpool_prepare_pool that we
513 res = pthread_cond_signal(pool->prefork_cond);
516 res = pthread_mutex_unlock(&pool->mutex);
520 * pthreadpool_prepare_pool has
521 * already locked this mutex across
522 * the fork. This makes us wait
523 * without sitting in a condvar.
525 res = pthread_mutex_lock(&pool->fork_mutex);
527 res = pthread_mutex_unlock(&pool->fork_mutex);
530 res = pthread_mutex_lock(&pool->mutex);
534 if (res == ETIMEDOUT) {
536 if (pool->num_jobs == 0) {
538 * we timed out and still no work for
541 pthreadpool_server_exit(pool);
550 if (pthreadpool_get_job(pool, &job)) {
554 * Do the work with the mutex unlocked
557 res = pthread_mutex_unlock(&pool->mutex);
560 job.fn(job.private_data);
562 ret = pool->signal_fn(job.id,
563 job.fn, job.private_data,
564 pool->signal_fn_private_data);
566 res = pthread_mutex_lock(&pool->mutex);
570 pthreadpool_server_exit(pool);
575 if ((pool->num_jobs == 0) && pool->shutdown) {
577 * No more work to do and we're asked to shut down, so
580 pthreadpool_server_exit(pool);
586 static int pthreadpool_create_thread(struct pthreadpool *pool)
588 pthread_attr_t thread_attr;
591 sigset_t mask, omask;
594 * Create a new worker thread. It should not receive any signals.
599 res = pthread_attr_init(&thread_attr);
604 res = pthread_attr_setdetachstate(
605 &thread_attr, PTHREAD_CREATE_DETACHED);
607 pthread_attr_destroy(&thread_attr);
611 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
613 pthread_attr_destroy(&thread_attr);
617 res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
620 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
622 pthread_attr_destroy(&thread_attr);
625 pool->num_threads += 1;
631 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
632 void (*fn)(void *private_data), void *private_data)
637 res = pthread_mutex_lock(&pool->mutex);
642 if (pool->shutdown) {
644 * Protect against the pool being shut down while
645 * trying to add a job
647 unlock_res = pthread_mutex_unlock(&pool->mutex);
648 assert(unlock_res == 0);
653 * Add job to the end of the queue
655 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
656 unlock_res = pthread_mutex_unlock(&pool->mutex);
657 assert(unlock_res == 0);
661 if (pool->num_idle > 0) {
663 * We have idle threads, wake one.
665 res = pthread_cond_signal(&pool->condvar);
667 pthreadpool_undo_put_job(pool);
669 unlock_res = pthread_mutex_unlock(&pool->mutex);
670 assert(unlock_res == 0);
674 if ((pool->max_threads != 0) &&
675 (pool->num_threads >= pool->max_threads)) {
677 * No more new threads, we just queue the request
679 unlock_res = pthread_mutex_unlock(&pool->mutex);
680 assert(unlock_res == 0);
684 res = pthreadpool_create_thread(pool);
686 unlock_res = pthread_mutex_unlock(&pool->mutex);
687 assert(unlock_res == 0);
691 if (pool->num_threads != 0) {
693 * At least one thread is still available, let
694 * that one run the queued job.
696 unlock_res = pthread_mutex_unlock(&pool->mutex);
697 assert(unlock_res == 0);
702 * No thread could be created to run job, fallback to sync
705 pthreadpool_undo_put_job(pool);
707 unlock_res = pthread_mutex_unlock(&pool->mutex);
708 assert(unlock_res == 0);
711 res = pool->signal_fn(job_id, fn, private_data,
712 pool->signal_fn_private_data);