Add thread pool
[ira/wip.git] / source3 / lib / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include <errno.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29
30 #include "pthreadpool.h"
31
32 struct pthreadpool_job {
33         struct pthreadpool_job *next;
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * Control access to this struct
42          */
43         pthread_mutex_t mutex;
44
45         /*
46          * Threads waiting for work do so here
47          */
48         pthread_cond_t condvar;
49
50         /*
51          * List of work jobs
52          */
53         struct pthreadpool_job *jobs, *last_job;
54
55         /*
56          * pipe for signalling
57          */
58         int sig_pipe[2];
59
60         /*
61          * indicator to worker threads that they should shut down
62          */
63         int shutdown;
64
65         /*
66          * maximum number of threads
67          */
68         int max_threads;
69
70         /*
71          * Number of threads
72          */
73         int num_threads;
74
75         /*
76          * Number of idle threads
77          */
78         int num_idle;
79
80         /*
81          * An array of threads that require joining, the array has
82          * "max_threads" elements. It contains "num_exited" ids.
83          */
84         int                     num_exited;
85         pthread_t               exited[1]; /* We alloc more */
86 };
87
88 /*
89  * Initialize a thread pool
90  */
91
92 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
93 {
94         struct pthreadpool *pool;
95         size_t size;
96         int ret;
97
98         size = sizeof(struct pthreadpool) + max_threads * sizeof(pthread_t);
99
100         pool = (struct pthreadpool *)malloc(size);
101         if (pool == NULL) {
102                 return ENOMEM;
103         }
104
105         ret = pthread_mutex_init(&pool->mutex, NULL);
106         if (ret != 0) {
107                 free(pool);
108                 return ret;
109         }
110
111         ret = pthread_cond_init(&pool->condvar, NULL);
112         if (ret != 0) {
113                 pthread_mutex_destroy(&pool->mutex);
114                 free(pool);
115                 return ret;
116         }
117
118         pool->shutdown = 0;
119         pool->jobs = pool->last_job = NULL;
120         pool->num_threads = 0;
121         pool->num_exited = 0;
122         pool->max_threads = max_threads;
123         pool->num_idle = 0;
124         pool->sig_pipe[0] = -1;
125         pool->sig_pipe[1] = -1;
126
127         *presult = pool;
128         return 0;
129 }
130
131 /*
132  * Create and return a file descriptor which becomes readable when a job has
133  * finished
134  */
135
136 int pthreadpool_sig_fd(struct pthreadpool *pool)
137 {
138         int result, ret;
139
140         ret = pthread_mutex_lock(&pool->mutex);
141         if (ret != 0) {
142                 errno = ret;
143                 return -1;
144         }
145
146         if (pool->sig_pipe[0] != -1) {
147                 result = pool->sig_pipe[0];
148                 goto done;
149         }
150
151         ret = pipe(pool->sig_pipe);
152         if (ret == -1) {
153                 result = -1;
154                 goto done;
155         }
156
157         result = pool->sig_pipe[0];
158 done:
159         ret = pthread_mutex_unlock(&pool->mutex);
160         assert(ret == 0);
161         return result;
162 }
163
164 /*
165  * Do a pthread_join() on all children that have exited, pool->mutex must be
166  * locked
167  */
168 static void pthreadpool_join_children(struct pthreadpool *pool)
169 {
170         int i;
171
172         for (i=0; i<pool->num_exited; i++) {
173                 pthread_join(pool->exited[i], NULL);
174         }
175         pool->num_exited = 0;
176 }
177
178 /*
179  * Fetch a finished job number from the signal pipe
180  */
181
182 int pthreadpool_finished_job(struct pthreadpool *pool)
183 {
184         int result, ret, fd;
185         ssize_t nread;
186
187         ret = pthread_mutex_lock(&pool->mutex);
188         if (ret != 0) {
189                 errno = ret;
190                 return -1;
191         }
192
193         /*
194          * Just some cleanup under the mutex
195          */
196         pthreadpool_join_children(pool);
197
198         fd = pool->sig_pipe[0];
199
200         ret = pthread_mutex_unlock(&pool->mutex);
201         assert(ret == 0);
202
203         if (fd == -1) {
204                 errno = EINVAL;
205                 return -1;
206         }
207
208         nread = -1;
209         errno = EINTR;
210
211         while ((nread == -1) && (errno == EINTR)) {
212                 nread = read(fd, &result, sizeof(int));
213         }
214
215         /*
216          * TODO: handle nread > 0 && nread < sizeof(int)
217          */
218
219         /*
220          * Lock the mutex to provide a memory barrier for data from the worker
221          * thread to the main thread. The pipe access itself does not have to
222          * be locked, for sizeof(int) the write to a pipe is atomic, and only
223          * one thread reads from it. But we need to lock the mutex briefly
224          * even if we don't do anything under the lock, to make sure we can
225          * see all memory the helper thread has written.
226          */
227
228         ret = pthread_mutex_lock(&pool->mutex);
229         if (ret == -1) {
230                 errno = ret;
231                 return -1;
232         }
233
234         ret = pthread_mutex_unlock(&pool->mutex);
235         assert(ret == 0);
236
237         return result;
238 }
239
240 /*
241  * Destroy a thread pool, finishing all threads working for it
242  */
243
244 int pthreadpool_destroy(struct pthreadpool *pool)
245 {
246         int ret, ret1;
247
248         ret = pthread_mutex_lock(&pool->mutex);
249         if (ret != 0) {
250                 return ret;
251         }
252
253         if (pool->num_threads > 0) {
254                 /*
255                  * We have active threads, tell them to finish, wait for that.
256                  */
257
258                 pool->shutdown = 1;
259
260                 if (pool->num_idle > 0) {
261                         /*
262                          * Wake the idle threads. They will find pool->quit to
263                          * be set and exit themselves
264                          */
265                         ret = pthread_cond_broadcast(&pool->condvar);
266                         if (ret != 0) {
267                                 pthread_mutex_unlock(&pool->mutex);
268                                 return ret;
269                         }
270                 }
271
272                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
273
274                         if (pool->num_exited > 0) {
275                                 pthreadpool_join_children(pool);
276                                 continue;
277                         }
278                         /*
279                          * A thread that shuts down will also signal
280                          * pool->condvar
281                          */
282                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
283                         if (ret != 0) {
284                                 pthread_mutex_unlock(&pool->mutex);
285                                 return ret;
286                         }
287                 }
288         }
289
290         ret = pthread_mutex_unlock(&pool->mutex);
291         if (ret != 0) {
292                 return ret;
293         }
294         ret = pthread_mutex_destroy(&pool->mutex);
295         ret1 = pthread_cond_destroy(&pool->condvar);
296
297         if ((ret == 0) && (ret1 == 0)) {
298                 free(pool);
299         }
300
301         if (ret != 0) {
302                 return ret;
303         }
304         return ret1;
305 }
306
307 /*
308  * Prepare for pthread_exit(), pool->mutex must be locked
309  */
310 static void pthreadpool_server_exit(struct pthreadpool *pool)
311 {
312         pool->num_threads -= 1;
313         pool->exited[pool->num_exited] = pthread_self();
314         pool->num_exited += 1;
315 }
316
317 static void *pthreadpool_server(void *arg)
318 {
319         struct pthreadpool *pool = (struct pthreadpool *)arg;
320         int res;
321
322         res = pthread_mutex_lock(&pool->mutex);
323         if (res != 0) {
324                 return NULL;
325         }
326
327         while (1) {
328                 struct timespec timeout;
329                 struct pthreadpool_job *job;
330
331                 /*
332                  * idle-wait at most 1 second. If nothing happens in that
333                  * time, exit this thread.
334                  */
335
336                 clock_gettime(CLOCK_REALTIME, &timeout);
337                 timeout.tv_sec += 1;
338
339                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
340
341                         pool->num_idle += 1;
342                         res = pthread_cond_timedwait(
343                                 &pool->condvar, &pool->mutex, &timeout);
344                         pool->num_idle -= 1;
345
346                         if (res == ETIMEDOUT) {
347
348                                 if (pool->jobs == NULL) {
349                                         /*
350                                          * we timed out and still no work for
351                                          * us. Exit.
352                                          */
353                                         pthreadpool_server_exit(pool);
354                                         pthread_mutex_unlock(&pool->mutex);
355                                         return NULL;
356                                 }
357
358                                 break;
359                         }
360                         assert(res == 0);
361                 }
362
363                 job = pool->jobs;
364
365                 if (job != NULL) {
366                         int fd = pool->sig_pipe[1];
367                         ssize_t written;
368
369                         /*
370                          * Ok, there's work for us to do, remove the job from
371                          * the pthreadpool list
372                          */
373                         pool->jobs = job->next;
374                         if (pool->last_job == job) {
375                                 pool->last_job = NULL;
376                         }
377
378                         /*
379                          * Do the work with the mutex unlocked :-)
380                          */
381
382                         res = pthread_mutex_unlock(&pool->mutex);
383                         assert(res == 0);
384
385                         job->fn(job->private_data);
386
387                         written = sizeof(int);
388
389                         res = pthread_mutex_lock(&pool->mutex);
390                         assert(res == 0);
391
392                         if (fd != -1) {
393                                 written = write(fd, &job->id, sizeof(int));
394                         }
395
396                         free(job);
397
398                         if (written != sizeof(int)) {
399                                 pthreadpool_server_exit(pool);
400                                 pthread_mutex_unlock(&pool->mutex);
401                                 return NULL;
402                         }
403                 }
404
405                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
406                         /*
407                          * No more work to do and we're asked to shut down, so
408                          * exit
409                          */
410                         pthreadpool_server_exit(pool);
411
412                         if (pool->num_threads == 0) {
413                                 /*
414                                  * Ping the main thread waiting for all of us
415                                  * workers to have quit.
416                                  */
417                                 pthread_cond_broadcast(&pool->condvar);
418                         }
419
420                         pthread_mutex_unlock(&pool->mutex);
421                         return NULL;
422                 }
423         }
424 }
425
426 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
427                         void (*fn)(void *private_data), void *private_data)
428 {
429         struct pthreadpool_job *job;
430         pthread_t thread_id;
431         int res;
432         sigset_t mask, omask;
433
434         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
435         if (job == NULL) {
436                 return ENOMEM;
437         }
438
439         job->fn = fn;
440         job->private_data = private_data;
441         job->id = job_id;
442         job->next = NULL;
443
444         res = pthread_mutex_lock(&pool->mutex);
445         if (res != 0) {
446                 free(job);
447                 return res;
448         }
449
450         /*
451          * Just some cleanup under the mutex
452          */
453         pthreadpool_join_children(pool);
454
455         /*
456          * Add job to the end of the queue
457          */
458         if (pool->jobs == NULL) {
459                 pool->jobs = job;
460         }
461         else {
462                 pool->last_job->next = job;
463         }
464         pool->last_job = job;
465
466         if (pool->num_idle > 0) {
467                 /*
468                  * We have idle threads, wake one.
469                  */
470                 res = pthread_cond_signal(&pool->condvar);
471                 pthread_mutex_unlock(&pool->mutex);
472                 return res;
473         }
474
475         if (pool->num_threads >= pool->max_threads) {
476                 /*
477                  * No more new threads, we just queue the request
478                  */
479                 pthread_mutex_unlock(&pool->mutex);
480                 return 0;
481         }
482
483         /*
484          * Create a new worker thread. It should not receive any signals.
485          */
486
487         sigfillset(&mask);
488
489         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
490         if (res != 0) {
491                 pthread_mutex_unlock(&pool->mutex);
492                 return res;
493         }
494
495         res = pthread_create(&thread_id, NULL, pthreadpool_server,
496                                 (void *)pool);
497         if (res == 0) {
498                 pool->num_threads += 1;
499         }
500
501         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
502
503         pthread_mutex_unlock(&pool->mutex);
504         return res;
505 }