2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #include "system/time.h"
30 #include "system/filesys.h"
33 #include "pthreadpool.h"
34 #include "lib/util/dlinklist.h"
36 struct pthreadpool_job {
38 void (*fn)(void *private_data);
44 * List pthreadpools for fork safety
46 struct pthreadpool *prev, *next;
49 * Control access to this struct
51 pthread_mutex_t mutex;
54 * Threads waiting for work do so here
56 pthread_cond_t condvar;
61 size_t jobs_array_len;
62 struct pthreadpool_job *jobs;
73 * indicator to worker threads that they should shut down
78 * maximum number of threads
88 * Number of idle threads
93 * An array of threads that require joining.
96 pthread_t *exited; /* We alloc more */
99 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
100 static struct pthreadpool *pthreadpools = NULL;
101 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
103 static void pthreadpool_prep_atfork(void);
106 * Initialize a thread pool
109 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
111 struct pthreadpool *pool;
114 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
119 pool->jobs_array_len = 4;
121 pool->jobs_array_len, sizeof(struct pthreadpool_job));
123 if (pool->jobs == NULL) {
128 pool->head = pool->num_jobs = 0;
130 ret = pipe(pool->sig_pipe);
138 ret = pthread_mutex_init(&pool->mutex, NULL);
140 close(pool->sig_pipe[0]);
141 close(pool->sig_pipe[1]);
147 ret = pthread_cond_init(&pool->condvar, NULL);
149 pthread_mutex_destroy(&pool->mutex);
150 close(pool->sig_pipe[0]);
151 close(pool->sig_pipe[1]);
158 pool->num_threads = 0;
159 pool->num_exited = 0;
161 pool->max_threads = max_threads;
164 ret = pthread_mutex_lock(&pthreadpools_mutex);
166 pthread_cond_destroy(&pool->condvar);
167 pthread_mutex_destroy(&pool->mutex);
168 close(pool->sig_pipe[0]);
169 close(pool->sig_pipe[1]);
174 DLIST_ADD(pthreadpools, pool);
176 ret = pthread_mutex_unlock(&pthreadpools_mutex);
179 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
186 static void pthreadpool_prepare(void)
189 struct pthreadpool *pool;
191 ret = pthread_mutex_lock(&pthreadpools_mutex);
196 while (pool != NULL) {
197 ret = pthread_mutex_lock(&pool->mutex);
203 static void pthreadpool_parent(void)
206 struct pthreadpool *pool;
208 for (pool = DLIST_TAIL(pthreadpools);
210 pool = DLIST_PREV(pool)) {
211 ret = pthread_mutex_unlock(&pool->mutex);
215 ret = pthread_mutex_unlock(&pthreadpools_mutex);
219 static void pthreadpool_child(void)
222 struct pthreadpool *pool;
224 for (pool = DLIST_TAIL(pthreadpools);
226 pool = DLIST_PREV(pool)) {
228 close(pool->sig_pipe[0]);
229 close(pool->sig_pipe[1]);
231 ret = pipe(pool->sig_pipe);
234 pool->num_threads = 0;
236 pool->num_exited = 0;
244 ret = pthread_mutex_unlock(&pool->mutex);
248 ret = pthread_mutex_unlock(&pthreadpools_mutex);
252 static void pthreadpool_prep_atfork(void)
254 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
259 * Return the file descriptor which becomes readable when a job has
263 int pthreadpool_signal_fd(struct pthreadpool *pool)
265 return pool->sig_pipe[0];
269 * Do a pthread_join() on all children that have exited, pool->mutex must be
272 static void pthreadpool_join_children(struct pthreadpool *pool)
276 for (i=0; i<pool->num_exited; i++) {
277 pthread_join(pool->exited[i], NULL);
279 pool->num_exited = 0;
282 * Deliberately not free and NULL pool->exited. That will be
283 * re-used by realloc later.
288 * Fetch a finished job number from the signal pipe
291 int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
294 ssize_t to_read, nread;
299 to_read = sizeof(int) * num_jobids;
301 while ((nread == -1) && (errno == EINTR)) {
302 nread = read(pool->sig_pipe[0], jobids, to_read);
307 if ((nread % sizeof(int)) != 0) {
310 return nread / sizeof(int);
314 * Destroy a thread pool, finishing all threads working for it
317 int pthreadpool_destroy(struct pthreadpool *pool)
321 ret = pthread_mutex_lock(&pool->mutex);
326 if ((pool->num_jobs != 0) || pool->shutdown) {
327 ret = pthread_mutex_unlock(&pool->mutex);
332 if (pool->num_threads > 0) {
334 * We have active threads, tell them to finish, wait for that.
339 if (pool->num_idle > 0) {
341 * Wake the idle threads. They will find
342 * pool->shutdown to be set and exit themselves
344 ret = pthread_cond_broadcast(&pool->condvar);
346 pthread_mutex_unlock(&pool->mutex);
351 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
353 if (pool->num_exited > 0) {
354 pthreadpool_join_children(pool);
358 * A thread that shuts down will also signal
361 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
363 pthread_mutex_unlock(&pool->mutex);
369 ret = pthread_mutex_unlock(&pool->mutex);
373 ret = pthread_mutex_destroy(&pool->mutex);
374 ret1 = pthread_cond_destroy(&pool->condvar);
383 ret = pthread_mutex_lock(&pthreadpools_mutex);
387 DLIST_REMOVE(pthreadpools, pool);
388 ret = pthread_mutex_unlock(&pthreadpools_mutex);
391 close(pool->sig_pipe[0]);
392 pool->sig_pipe[0] = -1;
394 close(pool->sig_pipe[1]);
395 pool->sig_pipe[1] = -1;
405 * Prepare for pthread_exit(), pool->mutex must be locked
407 static void pthreadpool_server_exit(struct pthreadpool *pool)
411 pool->num_threads -= 1;
413 exited = (pthread_t *)realloc(
414 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
416 if (exited == NULL) {
417 /* lost a thread status */
420 pool->exited = exited;
422 pool->exited[pool->num_exited] = pthread_self();
423 pool->num_exited += 1;
426 static bool pthreadpool_get_job(struct pthreadpool *p,
427 struct pthreadpool_job *job)
429 if (p->num_jobs == 0) {
432 *job = p->jobs[p->head];
433 p->head = (p->head+1) % p->jobs_array_len;
438 static bool pthreadpool_put_job(struct pthreadpool *p,
440 void (*fn)(void *private_data),
443 struct pthreadpool_job *job;
445 if (p->num_jobs == p->jobs_array_len) {
446 struct pthreadpool_job *tmp;
447 size_t new_len = p->jobs_array_len * 2;
450 p->jobs, sizeof(struct pthreadpool_job) * new_len);
457 * We just doubled the jobs array. The array implements a FIFO
458 * queue with a modulo-based wraparound, so we have to memcpy
459 * the jobs that are logically at the queue end but physically
460 * before the queue head into the reallocated area. The new
461 * space starts at the current jobs_array_len, and we have to
462 * copy everything before the current head job into the new
465 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
466 sizeof(struct pthreadpool_job) * p->head);
468 p->jobs_array_len = new_len;
471 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
474 job->private_data = private_data;
481 static void *pthreadpool_server(void *arg)
483 struct pthreadpool *pool = (struct pthreadpool *)arg;
486 res = pthread_mutex_lock(&pool->mutex);
493 struct pthreadpool_job job;
496 * idle-wait at most 1 second. If nothing happens in that
497 * time, exit this thread.
500 clock_gettime(CLOCK_REALTIME, &ts);
503 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
506 res = pthread_cond_timedwait(
507 &pool->condvar, &pool->mutex, &ts);
510 if (res == ETIMEDOUT) {
512 if (pool->num_jobs == 0) {
514 * we timed out and still no work for
517 pthreadpool_server_exit(pool);
518 pthread_mutex_unlock(&pool->mutex);
527 if (pthreadpool_get_job(pool, &job)) {
529 int sig_pipe = pool->sig_pipe[1];
532 * Do the work with the mutex unlocked
535 res = pthread_mutex_unlock(&pool->mutex);
538 job.fn(job.private_data);
540 res = pthread_mutex_lock(&pool->mutex);
543 written = write(sig_pipe, &job.id, sizeof(job.id));
544 if (written != sizeof(int)) {
545 pthreadpool_server_exit(pool);
546 pthread_mutex_unlock(&pool->mutex);
551 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
553 * No more work to do and we're asked to shut down, so
556 pthreadpool_server_exit(pool);
558 if (pool->num_threads == 0) {
560 * Ping the main thread waiting for all of us
561 * workers to have quit.
563 pthread_cond_broadcast(&pool->condvar);
566 pthread_mutex_unlock(&pool->mutex);
572 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
573 void (*fn)(void *private_data), void *private_data)
577 sigset_t mask, omask;
579 res = pthread_mutex_lock(&pool->mutex);
584 if (pool->shutdown) {
586 * Protect against the pool being shut down while
587 * trying to add a job
589 res = pthread_mutex_unlock(&pool->mutex);
595 * Just some cleanup under the mutex
597 pthreadpool_join_children(pool);
600 * Add job to the end of the queue
602 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
603 pthread_mutex_unlock(&pool->mutex);
607 if (pool->num_idle > 0) {
609 * We have idle threads, wake one.
611 res = pthread_cond_signal(&pool->condvar);
612 pthread_mutex_unlock(&pool->mutex);
616 if ((pool->max_threads != 0) &&
617 (pool->num_threads >= pool->max_threads)) {
619 * No more new threads, we just queue the request
621 pthread_mutex_unlock(&pool->mutex);
626 * Create a new worker thread. It should not receive any signals.
631 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
633 pthread_mutex_unlock(&pool->mutex);
637 res = pthread_create(&thread_id, NULL, pthreadpool_server,
640 pool->num_threads += 1;
643 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
645 pthread_mutex_unlock(&pool->mutex);