f97cdcc6c15861482ccbf6803bbadbaccd81b084
[samba.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         bool shutdown;
77
78         /*
79          * maximum number of threads
80          */
81         int max_threads;
82
83         /*
84          * Number of threads
85          */
86         int num_threads;
87
88         /*
89          * Number of idle threads
90          */
91         int num_idle;
92 };
93
94 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
95 static struct pthreadpool *pthreadpools = NULL;
96 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
97
98 static void pthreadpool_prep_atfork(void);
99
100 /*
101  * Initialize a thread pool
102  */
103
104 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
105                      int (*signal_fn)(int jobid,
106                                       void (*job_fn)(void *private_data),
107                                       void *job_fn_private_data,
108                                       void *private_data),
109                      void *signal_fn_private_data)
110 {
111         struct pthreadpool *pool;
112         int ret;
113
114         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
115         if (pool == NULL) {
116                 return ENOMEM;
117         }
118         pool->signal_fn = signal_fn;
119         pool->signal_fn_private_data = signal_fn_private_data;
120
121         pool->jobs_array_len = 4;
122         pool->jobs = calloc(
123                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
124
125         if (pool->jobs == NULL) {
126                 free(pool);
127                 return ENOMEM;
128         }
129
130         pool->head = pool->num_jobs = 0;
131
132         ret = pthread_mutex_init(&pool->mutex, NULL);
133         if (ret != 0) {
134                 free(pool->jobs);
135                 free(pool);
136                 return ret;
137         }
138
139         ret = pthread_cond_init(&pool->condvar, NULL);
140         if (ret != 0) {
141                 pthread_mutex_destroy(&pool->mutex);
142                 free(pool->jobs);
143                 free(pool);
144                 return ret;
145         }
146
147         pool->shutdown = false;
148         pool->num_threads = 0;
149         pool->max_threads = max_threads;
150         pool->num_idle = 0;
151
152         ret = pthread_mutex_lock(&pthreadpools_mutex);
153         if (ret != 0) {
154                 pthread_cond_destroy(&pool->condvar);
155                 pthread_mutex_destroy(&pool->mutex);
156                 free(pool->jobs);
157                 free(pool);
158                 return ret;
159         }
160         DLIST_ADD(pthreadpools, pool);
161
162         ret = pthread_mutex_unlock(&pthreadpools_mutex);
163         assert(ret == 0);
164
165         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
166
167         *presult = pool;
168
169         return 0;
170 }
171
172 static void pthreadpool_prepare(void)
173 {
174         int ret;
175         struct pthreadpool *pool;
176
177         ret = pthread_mutex_lock(&pthreadpools_mutex);
178         assert(ret == 0);
179
180         pool = pthreadpools;
181
182         while (pool != NULL) {
183                 ret = pthread_mutex_lock(&pool->mutex);
184                 assert(ret == 0);
185                 pool = pool->next;
186         }
187 }
188
189 static void pthreadpool_parent(void)
190 {
191         int ret;
192         struct pthreadpool *pool;
193
194         for (pool = DLIST_TAIL(pthreadpools);
195              pool != NULL;
196              pool = DLIST_PREV(pool)) {
197                 ret = pthread_mutex_unlock(&pool->mutex);
198                 assert(ret == 0);
199         }
200
201         ret = pthread_mutex_unlock(&pthreadpools_mutex);
202         assert(ret == 0);
203 }
204
205 static void pthreadpool_child(void)
206 {
207         int ret;
208         struct pthreadpool *pool;
209
210         for (pool = DLIST_TAIL(pthreadpools);
211              pool != NULL;
212              pool = DLIST_PREV(pool)) {
213
214                 pool->num_threads = 0;
215                 pool->num_idle = 0;
216                 pool->head = 0;
217                 pool->num_jobs = 0;
218
219                 ret = pthread_mutex_unlock(&pool->mutex);
220                 assert(ret == 0);
221         }
222
223         ret = pthread_mutex_unlock(&pthreadpools_mutex);
224         assert(ret == 0);
225 }
226
227 static void pthreadpool_prep_atfork(void)
228 {
229         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
230                        pthreadpool_child);
231 }
232
233 static int pthreadpool_free(struct pthreadpool *pool)
234 {
235         int ret, ret1;
236
237         ret = pthread_mutex_lock(&pthreadpools_mutex);
238         if (ret != 0) {
239                 return ret;
240         }
241         DLIST_REMOVE(pthreadpools, pool);
242         ret = pthread_mutex_unlock(&pthreadpools_mutex);
243         assert(ret == 0);
244
245         ret = pthread_mutex_destroy(&pool->mutex);
246         ret1 = pthread_cond_destroy(&pool->condvar);
247
248         if (ret != 0) {
249                 return ret;
250         }
251         if (ret1 != 0) {
252                 return ret1;
253         }
254
255         free(pool->jobs);
256         free(pool);
257
258         return 0;
259 }
260
261 /*
262  * Destroy a thread pool. Wake up all idle threads for exit. The last
263  * one will free the pool.
264  */
265
266 int pthreadpool_destroy(struct pthreadpool *pool)
267 {
268         int ret, ret1;
269
270         ret = pthread_mutex_lock(&pool->mutex);
271         if (ret != 0) {
272                 return ret;
273         }
274
275         if (pool->shutdown) {
276                 ret = pthread_mutex_unlock(&pool->mutex);
277                 assert(ret == 0);
278                 return EBUSY;
279         }
280
281         pool->shutdown = true;
282
283         if (pool->num_threads == 0) {
284                 ret = pthread_mutex_unlock(&pool->mutex);
285                 assert(ret == 0);
286
287                 ret = pthreadpool_free(pool);
288                 return ret;
289         }
290
291         /*
292          * We have active threads, tell them to finish.
293          */
294
295         ret = pthread_cond_broadcast(&pool->condvar);
296
297         ret1 = pthread_mutex_unlock(&pool->mutex);
298         assert(ret1 == 0);
299
300         return ret;
301 }
302
303 /*
304  * Prepare for pthread_exit(), pool->mutex must be locked and will be
305  * unlocked here. This is a bit of a layering violation, but here we
306  * also take care of removing the pool if we're the last thread.
307  */
308 static void pthreadpool_server_exit(struct pthreadpool *pool)
309 {
310         int ret;
311         bool free_it;
312
313         pool->num_threads -= 1;
314
315         free_it = (pool->shutdown && (pool->num_threads == 0));
316
317         ret = pthread_mutex_unlock(&pool->mutex);
318         assert(ret == 0);
319
320         if (free_it) {
321                 pthreadpool_free(pool);
322         }
323 }
324
325 static bool pthreadpool_get_job(struct pthreadpool *p,
326                                 struct pthreadpool_job *job)
327 {
328         if (p->num_jobs == 0) {
329                 return false;
330         }
331         *job = p->jobs[p->head];
332         p->head = (p->head+1) % p->jobs_array_len;
333         p->num_jobs -= 1;
334         return true;
335 }
336
337 static bool pthreadpool_put_job(struct pthreadpool *p,
338                                 int id,
339                                 void (*fn)(void *private_data),
340                                 void *private_data)
341 {
342         struct pthreadpool_job *job;
343
344         if (p->num_jobs == p->jobs_array_len) {
345                 struct pthreadpool_job *tmp;
346                 size_t new_len = p->jobs_array_len * 2;
347
348                 tmp = realloc(
349                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
350                 if (tmp == NULL) {
351                         return false;
352                 }
353                 p->jobs = tmp;
354
355                 /*
356                  * We just doubled the jobs array. The array implements a FIFO
357                  * queue with a modulo-based wraparound, so we have to memcpy
358                  * the jobs that are logically at the queue end but physically
359                  * before the queue head into the reallocated area. The new
360                  * space starts at the current jobs_array_len, and we have to
361                  * copy everything before the current head job into the new
362                  * area.
363                  */
364                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
365                        sizeof(struct pthreadpool_job) * p->head);
366
367                 p->jobs_array_len = new_len;
368         }
369
370         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
371         job->id = id;
372         job->fn = fn;
373         job->private_data = private_data;
374
375         p->num_jobs += 1;
376
377         return true;
378 }
379
380 static void *pthreadpool_server(void *arg)
381 {
382         struct pthreadpool *pool = (struct pthreadpool *)arg;
383         int res;
384
385         res = pthread_mutex_lock(&pool->mutex);
386         if (res != 0) {
387                 return NULL;
388         }
389
390         while (1) {
391                 struct timespec ts;
392                 struct pthreadpool_job job;
393
394                 /*
395                  * idle-wait at most 1 second. If nothing happens in that
396                  * time, exit this thread.
397                  */
398
399                 clock_gettime(CLOCK_REALTIME, &ts);
400                 ts.tv_sec += 1;
401
402                 while ((pool->num_jobs == 0) && !pool->shutdown) {
403
404                         pool->num_idle += 1;
405                         res = pthread_cond_timedwait(
406                                 &pool->condvar, &pool->mutex, &ts);
407                         pool->num_idle -= 1;
408
409                         if (res == ETIMEDOUT) {
410
411                                 if (pool->num_jobs == 0) {
412                                         /*
413                                          * we timed out and still no work for
414                                          * us. Exit.
415                                          */
416                                         pthreadpool_server_exit(pool);
417                                         return NULL;
418                                 }
419
420                                 break;
421                         }
422                         assert(res == 0);
423                 }
424
425                 if (pthreadpool_get_job(pool, &job)) {
426                         int ret;
427
428                         /*
429                          * Do the work with the mutex unlocked
430                          */
431
432                         res = pthread_mutex_unlock(&pool->mutex);
433                         assert(res == 0);
434
435                         job.fn(job.private_data);
436
437                         ret = pool->signal_fn(job.id,
438                                               job.fn, job.private_data,
439                                               pool->signal_fn_private_data);
440
441                         res = pthread_mutex_lock(&pool->mutex);
442                         assert(res == 0);
443
444                         if (ret != 0) {
445                                 pthreadpool_server_exit(pool);
446                                 return NULL;
447                         }
448                 }
449
450                 if ((pool->num_jobs == 0) && pool->shutdown) {
451                         /*
452                          * No more work to do and we're asked to shut down, so
453                          * exit
454                          */
455                         pthreadpool_server_exit(pool);
456                         return NULL;
457                 }
458         }
459 }
460
461 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
462                         void (*fn)(void *private_data), void *private_data)
463 {
464         pthread_attr_t thread_attr;
465         pthread_t thread_id;
466         int res;
467         sigset_t mask, omask;
468
469         res = pthread_mutex_lock(&pool->mutex);
470         if (res != 0) {
471                 return res;
472         }
473
474         if (pool->shutdown) {
475                 /*
476                  * Protect against the pool being shut down while
477                  * trying to add a job
478                  */
479                 res = pthread_mutex_unlock(&pool->mutex);
480                 assert(res == 0);
481                 return EINVAL;
482         }
483
484         /*
485          * Add job to the end of the queue
486          */
487         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
488                 pthread_mutex_unlock(&pool->mutex);
489                 return ENOMEM;
490         }
491
492         if (pool->num_idle > 0) {
493                 /*
494                  * We have idle threads, wake one.
495                  */
496                 res = pthread_cond_signal(&pool->condvar);
497                 pthread_mutex_unlock(&pool->mutex);
498                 return res;
499         }
500
501         if ((pool->max_threads != 0) &&
502             (pool->num_threads >= pool->max_threads)) {
503                 /*
504                  * No more new threads, we just queue the request
505                  */
506                 pthread_mutex_unlock(&pool->mutex);
507                 return 0;
508         }
509
510         /*
511          * Create a new worker thread. It should not receive any signals.
512          */
513
514         sigfillset(&mask);
515
516         res = pthread_attr_init(&thread_attr);
517         if (res != 0) {
518                 pthread_mutex_unlock(&pool->mutex);
519                 return res;
520         }
521
522         res = pthread_attr_setdetachstate(
523                 &thread_attr, PTHREAD_CREATE_DETACHED);
524         if (res != 0) {
525                 pthread_attr_destroy(&thread_attr);
526                 pthread_mutex_unlock(&pool->mutex);
527                 return res;
528         }
529
530         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
531         if (res != 0) {
532                 pthread_attr_destroy(&thread_attr);
533                 pthread_mutex_unlock(&pool->mutex);
534                 return res;
535         }
536
537         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
538                              (void *)pool);
539         if (res == 0) {
540                 pool->num_threads += 1;
541         }
542
543         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
544
545         pthread_attr_destroy(&thread_attr);
546
547         pthread_mutex_unlock(&pool->mutex);
548         return res;
549 }