2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job {
35 void (*fn)(void *private_data);
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
71 void *signal_fn_private_data;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
89 * Number of idle threads
94 * Condition variable indicating that we should quickly go
95 * away making way for fork() without anybody waiting on
98 pthread_cond_t *prefork_cond;
101 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
102 static struct pthreadpool *pthreadpools = NULL;
103 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
105 static void pthreadpool_prep_atfork(void);
108 * Initialize a thread pool
111 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
112 int (*signal_fn)(int jobid,
113 void (*job_fn)(void *private_data),
114 void *job_fn_private_data,
116 void *signal_fn_private_data)
118 struct pthreadpool *pool;
121 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
125 pool->signal_fn = signal_fn;
126 pool->signal_fn_private_data = signal_fn_private_data;
128 pool->jobs_array_len = 4;
130 pool->jobs_array_len, sizeof(struct pthreadpool_job));
132 if (pool->jobs == NULL) {
137 pool->head = pool->num_jobs = 0;
139 ret = pthread_mutex_init(&pool->mutex, NULL);
146 ret = pthread_cond_init(&pool->condvar, NULL);
148 pthread_mutex_destroy(&pool->mutex);
154 pool->shutdown = false;
155 pool->num_threads = 0;
156 pool->max_threads = max_threads;
158 pool->prefork_cond = NULL;
160 ret = pthread_mutex_lock(&pthreadpools_mutex);
162 pthread_cond_destroy(&pool->condvar);
163 pthread_mutex_destroy(&pool->mutex);
168 DLIST_ADD(pthreadpools, pool);
170 ret = pthread_mutex_unlock(&pthreadpools_mutex);
173 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
180 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
182 pthread_cond_t prefork_cond = PTHREAD_COND_INITIALIZER;
185 ret = pthread_mutex_lock(&pool->mutex);
188 while (pool->num_idle != 0) {
190 * Exit all idle threads, which are all blocked in
191 * pool->condvar. In the child we can destroy the
192 * pool, which would result in undefined behaviour in
193 * the pthread_cond_destroy(pool->condvar). glibc just
196 pool->prefork_cond = &prefork_cond;
198 ret = pthread_cond_signal(&pool->condvar);
201 ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
204 pool->prefork_cond = NULL;
207 ret = pthread_cond_destroy(&prefork_cond);
211 * Probably it's well-defined somewhere: What happens to
212 * condvars after a fork? The rationale of pthread_atfork only
213 * writes about mutexes. So better be safe than sorry and
214 * destroy/reinit pool->condvar across a fork.
217 ret = pthread_cond_destroy(&pool->condvar);
221 static void pthreadpool_prepare(void)
224 struct pthreadpool *pool;
226 ret = pthread_mutex_lock(&pthreadpools_mutex);
231 while (pool != NULL) {
232 pthreadpool_prepare_pool(pool);
237 static void pthreadpool_parent(void)
240 struct pthreadpool *pool;
242 for (pool = DLIST_TAIL(pthreadpools);
244 pool = DLIST_PREV(pool)) {
245 ret = pthread_cond_init(&pool->condvar, NULL);
247 ret = pthread_mutex_unlock(&pool->mutex);
251 ret = pthread_mutex_unlock(&pthreadpools_mutex);
255 static void pthreadpool_child(void)
258 struct pthreadpool *pool;
260 for (pool = DLIST_TAIL(pthreadpools);
262 pool = DLIST_PREV(pool)) {
264 pool->num_threads = 0;
269 ret = pthread_cond_init(&pool->condvar, NULL);
271 ret = pthread_mutex_unlock(&pool->mutex);
275 ret = pthread_mutex_unlock(&pthreadpools_mutex);
279 static void pthreadpool_prep_atfork(void)
281 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
285 static int pthreadpool_free(struct pthreadpool *pool)
289 ret = pthread_mutex_lock(&pthreadpools_mutex);
293 DLIST_REMOVE(pthreadpools, pool);
294 ret = pthread_mutex_unlock(&pthreadpools_mutex);
297 ret = pthread_mutex_destroy(&pool->mutex);
298 ret1 = pthread_cond_destroy(&pool->condvar);
314 * Destroy a thread pool. Wake up all idle threads for exit. The last
315 * one will free the pool.
318 int pthreadpool_destroy(struct pthreadpool *pool)
322 ret = pthread_mutex_lock(&pool->mutex);
327 if (pool->shutdown) {
328 ret = pthread_mutex_unlock(&pool->mutex);
333 pool->shutdown = true;
335 if (pool->num_threads == 0) {
336 ret = pthread_mutex_unlock(&pool->mutex);
339 ret = pthreadpool_free(pool);
344 * We have active threads, tell them to finish.
347 ret = pthread_cond_broadcast(&pool->condvar);
349 ret1 = pthread_mutex_unlock(&pool->mutex);
356 * Prepare for pthread_exit(), pool->mutex must be locked and will be
357 * unlocked here. This is a bit of a layering violation, but here we
358 * also take care of removing the pool if we're the last thread.
360 static void pthreadpool_server_exit(struct pthreadpool *pool)
365 pool->num_threads -= 1;
367 free_it = (pool->shutdown && (pool->num_threads == 0));
369 ret = pthread_mutex_unlock(&pool->mutex);
373 pthreadpool_free(pool);
377 static bool pthreadpool_get_job(struct pthreadpool *p,
378 struct pthreadpool_job *job)
380 if (p->num_jobs == 0) {
383 *job = p->jobs[p->head];
384 p->head = (p->head+1) % p->jobs_array_len;
389 static bool pthreadpool_put_job(struct pthreadpool *p,
391 void (*fn)(void *private_data),
394 struct pthreadpool_job *job;
396 if (p->num_jobs == p->jobs_array_len) {
397 struct pthreadpool_job *tmp;
398 size_t new_len = p->jobs_array_len * 2;
401 p->jobs, sizeof(struct pthreadpool_job) * new_len);
408 * We just doubled the jobs array. The array implements a FIFO
409 * queue with a modulo-based wraparound, so we have to memcpy
410 * the jobs that are logically at the queue end but physically
411 * before the queue head into the reallocated area. The new
412 * space starts at the current jobs_array_len, and we have to
413 * copy everything before the current head job into the new
416 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
417 sizeof(struct pthreadpool_job) * p->head);
419 p->jobs_array_len = new_len;
422 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
425 job->private_data = private_data;
432 static void *pthreadpool_server(void *arg)
434 struct pthreadpool *pool = (struct pthreadpool *)arg;
437 res = pthread_mutex_lock(&pool->mutex);
444 struct pthreadpool_job job;
447 * idle-wait at most 1 second. If nothing happens in that
448 * time, exit this thread.
451 clock_gettime(CLOCK_REALTIME, &ts);
454 while ((pool->num_jobs == 0) && !pool->shutdown) {
457 res = pthread_cond_timedwait(
458 &pool->condvar, &pool->mutex, &ts);
461 if (pool->prefork_cond != NULL) {
463 * Me must allow fork() to continue
464 * without anybody waiting on
467 pthread_cond_signal(pool->prefork_cond);
468 pthreadpool_server_exit(pool);
472 if (res == ETIMEDOUT) {
474 if (pool->num_jobs == 0) {
476 * we timed out and still no work for
479 pthreadpool_server_exit(pool);
488 if (pthreadpool_get_job(pool, &job)) {
492 * Do the work with the mutex unlocked
495 res = pthread_mutex_unlock(&pool->mutex);
498 job.fn(job.private_data);
500 ret = pool->signal_fn(job.id,
501 job.fn, job.private_data,
502 pool->signal_fn_private_data);
504 res = pthread_mutex_lock(&pool->mutex);
508 pthreadpool_server_exit(pool);
513 if ((pool->num_jobs == 0) && pool->shutdown) {
515 * No more work to do and we're asked to shut down, so
518 pthreadpool_server_exit(pool);
524 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
525 void (*fn)(void *private_data), void *private_data)
527 pthread_attr_t thread_attr;
530 sigset_t mask, omask;
532 res = pthread_mutex_lock(&pool->mutex);
537 if (pool->shutdown) {
539 * Protect against the pool being shut down while
540 * trying to add a job
542 res = pthread_mutex_unlock(&pool->mutex);
548 * Add job to the end of the queue
550 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
551 pthread_mutex_unlock(&pool->mutex);
555 if (pool->num_idle > 0) {
557 * We have idle threads, wake one.
559 res = pthread_cond_signal(&pool->condvar);
560 pthread_mutex_unlock(&pool->mutex);
564 if ((pool->max_threads != 0) &&
565 (pool->num_threads >= pool->max_threads)) {
567 * No more new threads, we just queue the request
569 pthread_mutex_unlock(&pool->mutex);
574 * Create a new worker thread. It should not receive any signals.
579 res = pthread_attr_init(&thread_attr);
581 pthread_mutex_unlock(&pool->mutex);
585 res = pthread_attr_setdetachstate(
586 &thread_attr, PTHREAD_CREATE_DETACHED);
588 pthread_attr_destroy(&thread_attr);
589 pthread_mutex_unlock(&pool->mutex);
593 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
595 pthread_attr_destroy(&thread_attr);
596 pthread_mutex_unlock(&pool->mutex);
600 res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
603 pool->num_threads += 1;
606 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
608 pthread_attr_destroy(&thread_attr);
610 pthread_mutex_unlock(&pool->mutex);