pthreadpool: Slightly serialize jobs
[samba.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "config.h"
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include "system/time.h"
30 #include "system/filesys.h"
31 #include "replace.h"
32
33 #include "pthreadpool.h"
34 #include "lib/util/dlinklist.h"
35
36 struct pthreadpool_job {
37         int id;
38         void (*fn)(void *private_data);
39         void *private_data;
40 };
41
42 struct pthreadpool {
43         /*
44          * List pthreadpools for fork safety
45          */
46         struct pthreadpool *prev, *next;
47
48         /*
49          * Control access to this struct
50          */
51         pthread_mutex_t mutex;
52
53         /*
54          * Threads waiting for work do so here
55          */
56         pthread_cond_t condvar;
57
58         /*
59          * Array of jobs
60          */
61         size_t jobs_array_len;
62         struct pthreadpool_job *jobs;
63
64         size_t head;
65         size_t num_jobs;
66
67         /*
68          * pipe for signalling
69          */
70         int sig_pipe[2];
71
72         /*
73          * indicator to worker threads that they should shut down
74          */
75         int shutdown;
76
77         /*
78          * maximum number of threads
79          */
80         int max_threads;
81
82         /*
83          * Number of threads
84          */
85         int num_threads;
86
87         /*
88          * Number of idle threads
89          */
90         int num_idle;
91
92         /*
93          * An array of threads that require joining.
94          */
95         int                     num_exited;
96         pthread_t               *exited; /* We alloc more */
97 };
98
99 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
100 static struct pthreadpool *pthreadpools = NULL;
101 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
102
103 static void pthreadpool_prep_atfork(void);
104
105 /*
106  * Initialize a thread pool
107  */
108
109 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
110 {
111         struct pthreadpool *pool;
112         int ret;
113
114         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
115         if (pool == NULL) {
116                 return ENOMEM;
117         }
118
119         pool->jobs_array_len = 4;
120         pool->jobs = calloc(
121                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
122
123         if (pool->jobs == NULL) {
124                 free(pool);
125                 return ENOMEM;
126         }
127
128         pool->head = pool->num_jobs = 0;
129
130         ret = pipe(pool->sig_pipe);
131         if (ret == -1) {
132                 int err = errno;
133                 free(pool->jobs);
134                 free(pool);
135                 return err;
136         }
137
138         ret = pthread_mutex_init(&pool->mutex, NULL);
139         if (ret != 0) {
140                 close(pool->sig_pipe[0]);
141                 close(pool->sig_pipe[1]);
142                 free(pool->jobs);
143                 free(pool);
144                 return ret;
145         }
146
147         ret = pthread_cond_init(&pool->condvar, NULL);
148         if (ret != 0) {
149                 pthread_mutex_destroy(&pool->mutex);
150                 close(pool->sig_pipe[0]);
151                 close(pool->sig_pipe[1]);
152                 free(pool->jobs);
153                 free(pool);
154                 return ret;
155         }
156
157         pool->shutdown = 0;
158         pool->num_threads = 0;
159         pool->num_exited = 0;
160         pool->exited = NULL;
161         pool->max_threads = max_threads;
162         pool->num_idle = 0;
163
164         ret = pthread_mutex_lock(&pthreadpools_mutex);
165         if (ret != 0) {
166                 pthread_cond_destroy(&pool->condvar);
167                 pthread_mutex_destroy(&pool->mutex);
168                 close(pool->sig_pipe[0]);
169                 close(pool->sig_pipe[1]);
170                 free(pool->jobs);
171                 free(pool);
172                 return ret;
173         }
174         DLIST_ADD(pthreadpools, pool);
175
176         ret = pthread_mutex_unlock(&pthreadpools_mutex);
177         assert(ret == 0);
178
179         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
180
181         *presult = pool;
182
183         return 0;
184 }
185
186 static void pthreadpool_prepare(void)
187 {
188         int ret;
189         struct pthreadpool *pool;
190
191         ret = pthread_mutex_lock(&pthreadpools_mutex);
192         assert(ret == 0);
193
194         pool = pthreadpools;
195
196         while (pool != NULL) {
197                 ret = pthread_mutex_lock(&pool->mutex);
198                 assert(ret == 0);
199                 pool = pool->next;
200         }
201 }
202
203 static void pthreadpool_parent(void)
204 {
205         int ret;
206         struct pthreadpool *pool;
207
208         for (pool = DLIST_TAIL(pthreadpools);
209              pool != NULL;
210              pool = DLIST_PREV(pool)) {
211                 ret = pthread_mutex_unlock(&pool->mutex);
212                 assert(ret == 0);
213         }
214
215         ret = pthread_mutex_unlock(&pthreadpools_mutex);
216         assert(ret == 0);
217 }
218
219 static void pthreadpool_child(void)
220 {
221         int ret;
222         struct pthreadpool *pool;
223
224         for (pool = DLIST_TAIL(pthreadpools);
225              pool != NULL;
226              pool = DLIST_PREV(pool)) {
227
228                 close(pool->sig_pipe[0]);
229                 close(pool->sig_pipe[1]);
230
231                 ret = pipe(pool->sig_pipe);
232                 assert(ret == 0);
233
234                 pool->num_threads = 0;
235
236                 pool->num_exited = 0;
237                 free(pool->exited);
238                 pool->exited = NULL;
239
240                 pool->num_idle = 0;
241                 pool->head = 0;
242                 pool->num_jobs = 0;
243
244                 ret = pthread_mutex_unlock(&pool->mutex);
245                 assert(ret == 0);
246         }
247
248         ret = pthread_mutex_unlock(&pthreadpools_mutex);
249         assert(ret == 0);
250 }
251
252 static void pthreadpool_prep_atfork(void)
253 {
254         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
255                        pthreadpool_child);
256 }
257
258 /*
259  * Return the file descriptor which becomes readable when a job has
260  * finished
261  */
262
263 int pthreadpool_signal_fd(struct pthreadpool *pool)
264 {
265         return pool->sig_pipe[0];
266 }
267
268 /*
269  * Do a pthread_join() on all children that have exited, pool->mutex must be
270  * locked
271  */
272 static void pthreadpool_join_children(struct pthreadpool *pool)
273 {
274         int i;
275
276         for (i=0; i<pool->num_exited; i++) {
277                 pthread_join(pool->exited[i], NULL);
278         }
279         pool->num_exited = 0;
280
281         /*
282          * Deliberately not free and NULL pool->exited. That will be
283          * re-used by realloc later.
284          */
285 }
286
287 /*
288  * Fetch a finished job number from the signal pipe
289  */
290
291 int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
292                               unsigned num_jobids)
293 {
294         ssize_t to_read, nread;
295
296         nread = -1;
297         errno = EINTR;
298
299         to_read = sizeof(int) * num_jobids;
300
301         while ((nread == -1) && (errno == EINTR)) {
302                 nread = read(pool->sig_pipe[0], jobids, to_read);
303         }
304         if (nread == -1) {
305                 return -errno;
306         }
307         if ((nread % sizeof(int)) != 0) {
308                 return -EINVAL;
309         }
310         return nread / sizeof(int);
311 }
312
313 /*
314  * Destroy a thread pool, finishing all threads working for it
315  */
316
317 int pthreadpool_destroy(struct pthreadpool *pool)
318 {
319         int ret, ret1;
320
321         ret = pthread_mutex_lock(&pool->mutex);
322         if (ret != 0) {
323                 return ret;
324         }
325
326         if ((pool->num_jobs != 0) || pool->shutdown) {
327                 ret = pthread_mutex_unlock(&pool->mutex);
328                 assert(ret == 0);
329                 return EBUSY;
330         }
331
332         if (pool->num_threads > 0) {
333                 /*
334                  * We have active threads, tell them to finish, wait for that.
335                  */
336
337                 pool->shutdown = 1;
338
339                 if (pool->num_idle > 0) {
340                         /*
341                          * Wake the idle threads. They will find
342                          * pool->shutdown to be set and exit themselves
343                          */
344                         ret = pthread_cond_broadcast(&pool->condvar);
345                         if (ret != 0) {
346                                 pthread_mutex_unlock(&pool->mutex);
347                                 return ret;
348                         }
349                 }
350
351                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
352
353                         if (pool->num_exited > 0) {
354                                 pthreadpool_join_children(pool);
355                                 continue;
356                         }
357                         /*
358                          * A thread that shuts down will also signal
359                          * pool->condvar
360                          */
361                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
362                         if (ret != 0) {
363                                 pthread_mutex_unlock(&pool->mutex);
364                                 return ret;
365                         }
366                 }
367         }
368
369         ret = pthread_mutex_unlock(&pool->mutex);
370         if (ret != 0) {
371                 return ret;
372         }
373         ret = pthread_mutex_destroy(&pool->mutex);
374         ret1 = pthread_cond_destroy(&pool->condvar);
375
376         if (ret != 0) {
377                 return ret;
378         }
379         if (ret1 != 0) {
380                 return ret1;
381         }
382
383         ret = pthread_mutex_lock(&pthreadpools_mutex);
384         if (ret != 0) {
385                 return ret;
386         }
387         DLIST_REMOVE(pthreadpools, pool);
388         ret = pthread_mutex_unlock(&pthreadpools_mutex);
389         assert(ret == 0);
390
391         close(pool->sig_pipe[0]);
392         pool->sig_pipe[0] = -1;
393
394         close(pool->sig_pipe[1]);
395         pool->sig_pipe[1] = -1;
396
397         free(pool->exited);
398         free(pool->jobs);
399         free(pool);
400
401         return 0;
402 }
403
404 /*
405  * Prepare for pthread_exit(), pool->mutex must be locked
406  */
407 static void pthreadpool_server_exit(struct pthreadpool *pool)
408 {
409         pthread_t *exited;
410
411         pool->num_threads -= 1;
412
413         exited = (pthread_t *)realloc(
414                 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
415
416         if (exited == NULL) {
417                 /* lost a thread status */
418                 return;
419         }
420         pool->exited = exited;
421
422         pool->exited[pool->num_exited] = pthread_self();
423         pool->num_exited += 1;
424 }
425
426 static bool pthreadpool_get_job(struct pthreadpool *p,
427                                 struct pthreadpool_job *job)
428 {
429         if (p->num_jobs == 0) {
430                 return false;
431         }
432         *job = p->jobs[p->head];
433         p->head = (p->head+1) % p->jobs_array_len;
434         p->num_jobs -= 1;
435         return true;
436 }
437
438 static bool pthreadpool_put_job(struct pthreadpool *p,
439                                 int id,
440                                 void (*fn)(void *private_data),
441                                 void *private_data)
442 {
443         struct pthreadpool_job *job;
444
445         if (p->num_jobs == p->jobs_array_len) {
446                 struct pthreadpool_job *tmp;
447                 size_t new_len = p->jobs_array_len * 2;
448
449                 tmp = realloc(
450                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
451                 if (tmp == NULL) {
452                         return false;
453                 }
454                 p->jobs = tmp;
455
456                 /*
457                  * We just doubled the jobs array. The array implements a FIFO
458                  * queue with a modulo-based wraparound, so we have to memcpy
459                  * the jobs that are logically at the queue end but physically
460                  * before the queue head into the reallocated area. The new
461                  * space starts at the current jobs_array_len, and we have to
462                  * copy everything before the current head job into the new
463                  * area.
464                  */
465                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
466                        sizeof(struct pthreadpool_job) * p->head);
467
468                 p->jobs_array_len = new_len;
469         }
470
471         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
472         job->id = id;
473         job->fn = fn;
474         job->private_data = private_data;
475
476         p->num_jobs += 1;
477
478         return true;
479 }
480
481 static void *pthreadpool_server(void *arg)
482 {
483         struct pthreadpool *pool = (struct pthreadpool *)arg;
484         int res;
485
486         res = pthread_mutex_lock(&pool->mutex);
487         if (res != 0) {
488                 return NULL;
489         }
490
491         while (1) {
492                 struct timespec ts;
493                 struct pthreadpool_job job;
494
495                 /*
496                  * idle-wait at most 1 second. If nothing happens in that
497                  * time, exit this thread.
498                  */
499
500                 clock_gettime(CLOCK_REALTIME, &ts);
501                 ts.tv_sec += 1;
502
503                 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
504
505                         pool->num_idle += 1;
506                         res = pthread_cond_timedwait(
507                                 &pool->condvar, &pool->mutex, &ts);
508                         pool->num_idle -= 1;
509
510                         if (res == ETIMEDOUT) {
511
512                                 if (pool->num_jobs == 0) {
513                                         /*
514                                          * we timed out and still no work for
515                                          * us. Exit.
516                                          */
517                                         pthreadpool_server_exit(pool);
518                                         pthread_mutex_unlock(&pool->mutex);
519                                         return NULL;
520                                 }
521
522                                 break;
523                         }
524                         assert(res == 0);
525                 }
526
527                 if (pthreadpool_get_job(pool, &job)) {
528                         ssize_t written;
529                         int sig_pipe = pool->sig_pipe[1];
530
531                         /*
532                          * Do the work with the mutex unlocked
533                          */
534
535                         res = pthread_mutex_unlock(&pool->mutex);
536                         assert(res == 0);
537
538                         job.fn(job.private_data);
539
540                         res = pthread_mutex_lock(&pool->mutex);
541                         assert(res == 0);
542
543                         written = write(sig_pipe, &job.id, sizeof(job.id));
544                         if (written != sizeof(int)) {
545                                 pthreadpool_server_exit(pool);
546                                 pthread_mutex_unlock(&pool->mutex);
547                                 return NULL;
548                         }
549                 }
550
551                 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
552                         /*
553                          * No more work to do and we're asked to shut down, so
554                          * exit
555                          */
556                         pthreadpool_server_exit(pool);
557
558                         if (pool->num_threads == 0) {
559                                 /*
560                                  * Ping the main thread waiting for all of us
561                                  * workers to have quit.
562                                  */
563                                 pthread_cond_broadcast(&pool->condvar);
564                         }
565
566                         pthread_mutex_unlock(&pool->mutex);
567                         return NULL;
568                 }
569         }
570 }
571
572 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
573                         void (*fn)(void *private_data), void *private_data)
574 {
575         pthread_t thread_id;
576         int res;
577         sigset_t mask, omask;
578
579         res = pthread_mutex_lock(&pool->mutex);
580         if (res != 0) {
581                 return res;
582         }
583
584         if (pool->shutdown) {
585                 /*
586                  * Protect against the pool being shut down while
587                  * trying to add a job
588                  */
589                 res = pthread_mutex_unlock(&pool->mutex);
590                 assert(res == 0);
591                 return EINVAL;
592         }
593
594         /*
595          * Just some cleanup under the mutex
596          */
597         pthreadpool_join_children(pool);
598
599         /*
600          * Add job to the end of the queue
601          */
602         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
603                 pthread_mutex_unlock(&pool->mutex);
604                 return ENOMEM;
605         }
606
607         if (pool->num_idle > 0) {
608                 /*
609                  * We have idle threads, wake one.
610                  */
611                 res = pthread_cond_signal(&pool->condvar);
612                 pthread_mutex_unlock(&pool->mutex);
613                 return res;
614         }
615
616         if ((pool->max_threads != 0) &&
617             (pool->num_threads >= pool->max_threads)) {
618                 /*
619                  * No more new threads, we just queue the request
620                  */
621                 pthread_mutex_unlock(&pool->mutex);
622                 return 0;
623         }
624
625         /*
626          * Create a new worker thread. It should not receive any signals.
627          */
628
629         sigfillset(&mask);
630
631         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
632         if (res != 0) {
633                 pthread_mutex_unlock(&pool->mutex);
634                 return res;
635         }
636
637         res = pthread_create(&thread_id, NULL, pthreadpool_server,
638                                 (void *)pool);
639         if (res == 0) {
640                 pool->num_threads += 1;
641         }
642
643         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
644
645         pthread_mutex_unlock(&pool->mutex);
646         return res;
647 }