lib:replace: Add getprogname()
[samba.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "system/filesys.h"
25 #include "pthreadpool.h"
26 #include "lib/util/dlinklist.h"
27 #include "lib/util/blocking.h"
28
29 #ifdef NDEBUG
30 #undef NDEBUG
31 #endif
32
33 #include <assert.h>
34
35 struct pthreadpool_job {
36         int id;
37         void (*fn)(void *private_data);
38         void *private_data;
39 };
40
41 struct pthreadpool {
42         /*
43          * List pthreadpools for fork safety
44          */
45         struct pthreadpool *prev, *next;
46
47         /*
48          * Control access to this struct
49          */
50         pthread_mutex_t mutex;
51
52         /*
53          * Threads waiting for work do so here
54          */
55         pthread_cond_t condvar;
56
57         int check_pipefd[2];
58
59         /*
60          * Array of jobs
61          */
62         size_t jobs_array_len;
63         struct pthreadpool_job *jobs;
64
65         size_t head;
66         size_t num_jobs;
67
68         /*
69          * Indicate job completion
70          */
71         int (*signal_fn)(int jobid,
72                          void (*job_fn)(void *private_data),
73                          void *job_fn_private_data,
74                          void *private_data);
75         void *signal_fn_private_data;
76
77         /*
78          * indicator to worker threads to stop processing further jobs
79          * and exit.
80          */
81         bool stopped;
82
83         /*
84          * indicator to the last worker thread to free the pool
85          * resources.
86          */
87         bool destroyed;
88
89         /*
90          * maximum number of threads
91          * 0 means no real thread, only strict sync processing.
92          */
93         unsigned max_threads;
94
95         /*
96          * Number of threads
97          */
98         unsigned num_threads;
99
100         /*
101          * Number of idle threads
102          */
103         unsigned num_idle;
104
105         /*
106          * Condition variable indicating that helper threads should
107          * quickly go away making way for fork() without anybody
108          * waiting on pool->condvar.
109          */
110         pthread_cond_t *prefork_cond;
111
112         /*
113          * Waiting position for helper threads while fork is
114          * running. The forking thread will have locked it, and all
115          * idle helper threads will sit here until after the fork,
116          * where the forking thread will unlock it again.
117          */
118         pthread_mutex_t fork_mutex;
119
120         bool per_thread_cwd;
121 };
122
123 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
124 static struct pthreadpool *pthreadpools = NULL;
125 static bool pthreadpool_support_thread_cwd = false;
126 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
127
128 static void pthreadpool_prep_atfork(void);
129
130 /*
131  * Initialize a thread pool
132  */
133
134 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
135                      int (*signal_fn)(int jobid,
136                                       void (*job_fn)(void *private_data),
137                                       void *job_fn_private_data,
138                                       void *private_data),
139                      void *signal_fn_private_data)
140 {
141         struct pthreadpool *pool;
142         int ret;
143         bool ok;
144
145         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
146         if (pool == NULL) {
147                 return ENOMEM;
148         }
149         pool->signal_fn = signal_fn;
150         pool->signal_fn_private_data = signal_fn_private_data;
151
152         pool->jobs_array_len = 4;
153         pool->jobs = calloc(
154                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
155
156         if (pool->jobs == NULL) {
157                 free(pool);
158                 return ENOMEM;
159         }
160
161         ret = pipe(pool->check_pipefd);
162         if (ret != 0) {
163                 free(pool->jobs);
164                 free(pool);
165                 return ENOMEM;
166         }
167
168         ok = smb_set_close_on_exec(pool->check_pipefd[0]);
169         if (!ok) {
170                 close(pool->check_pipefd[0]);
171                 close(pool->check_pipefd[1]);
172                 free(pool->jobs);
173                 free(pool);
174                 return EINVAL;
175         }
176         ok = smb_set_close_on_exec(pool->check_pipefd[1]);
177         if (!ok) {
178                 close(pool->check_pipefd[0]);
179                 close(pool->check_pipefd[1]);
180                 free(pool->jobs);
181                 free(pool);
182                 return EINVAL;
183         }
184         ret = set_blocking(pool->check_pipefd[0], true);
185         if (ret == -1) {
186                 close(pool->check_pipefd[0]);
187                 close(pool->check_pipefd[1]);
188                 free(pool->jobs);
189                 free(pool);
190                 return EINVAL;
191         }
192         ret = set_blocking(pool->check_pipefd[1], false);
193         if (ret == -1) {
194                 close(pool->check_pipefd[0]);
195                 close(pool->check_pipefd[1]);
196                 free(pool->jobs);
197                 free(pool);
198                 return EINVAL;
199         }
200
201         pool->head = pool->num_jobs = 0;
202
203         ret = pthread_mutex_init(&pool->mutex, NULL);
204         if (ret != 0) {
205                 close(pool->check_pipefd[0]);
206                 close(pool->check_pipefd[1]);
207                 free(pool->jobs);
208                 free(pool);
209                 return ret;
210         }
211
212         ret = pthread_cond_init(&pool->condvar, NULL);
213         if (ret != 0) {
214                 pthread_mutex_destroy(&pool->mutex);
215                 close(pool->check_pipefd[0]);
216                 close(pool->check_pipefd[1]);
217                 free(pool->jobs);
218                 free(pool);
219                 return ret;
220         }
221
222         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
223         if (ret != 0) {
224                 pthread_cond_destroy(&pool->condvar);
225                 pthread_mutex_destroy(&pool->mutex);
226                 close(pool->check_pipefd[0]);
227                 close(pool->check_pipefd[1]);
228                 free(pool->jobs);
229                 free(pool);
230                 return ret;
231         }
232
233         pool->stopped = false;
234         pool->destroyed = false;
235         pool->num_threads = 0;
236         pool->max_threads = max_threads;
237         pool->num_idle = 0;
238         pool->prefork_cond = NULL;
239         if (max_threads != 0) {
240                 pool->per_thread_cwd = pthreadpool_support_thread_cwd;
241         } else {
242                 pool->per_thread_cwd = false;
243         }
244
245         ret = pthread_mutex_lock(&pthreadpools_mutex);
246         if (ret != 0) {
247                 pthread_mutex_destroy(&pool->fork_mutex);
248                 pthread_cond_destroy(&pool->condvar);
249                 pthread_mutex_destroy(&pool->mutex);
250                 close(pool->check_pipefd[0]);
251                 close(pool->check_pipefd[1]);
252                 free(pool->jobs);
253                 free(pool);
254                 return ret;
255         }
256         DLIST_ADD(pthreadpools, pool);
257
258         ret = pthread_mutex_unlock(&pthreadpools_mutex);
259         assert(ret == 0);
260
261         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
262
263         *presult = pool;
264
265         return 0;
266 }
267
268 size_t pthreadpool_max_threads(struct pthreadpool *pool)
269 {
270         if (pool->stopped) {
271                 return 0;
272         }
273
274         return pool->max_threads;
275 }
276
277 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
278 {
279         int res;
280         int unlock_res;
281         size_t ret;
282
283         if (pool->stopped) {
284                 return 0;
285         }
286
287         res = pthread_mutex_lock(&pool->mutex);
288         if (res != 0) {
289                 return res;
290         }
291
292         if (pool->stopped) {
293                 unlock_res = pthread_mutex_unlock(&pool->mutex);
294                 assert(unlock_res == 0);
295                 return 0;
296         }
297
298         ret = pool->num_jobs;
299
300         unlock_res = pthread_mutex_unlock(&pool->mutex);
301         assert(unlock_res == 0);
302         return ret;
303 }
304
305 bool pthreadpool_per_thread_cwd(struct pthreadpool *pool)
306 {
307         if (pool->stopped) {
308                 return false;
309         }
310
311         return pool->per_thread_cwd;
312 }
313
314 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
315 {
316         int ret;
317
318         ret = pthread_mutex_lock(&pool->fork_mutex);
319         assert(ret == 0);
320
321         ret = pthread_mutex_lock(&pool->mutex);
322         assert(ret == 0);
323
324         while (pool->num_idle != 0) {
325                 unsigned num_idle = pool->num_idle;
326                 pthread_cond_t prefork_cond;
327
328                 ret = pthread_cond_init(&prefork_cond, NULL);
329                 assert(ret == 0);
330
331                 /*
332                  * Push all idle threads off pool->condvar. In the
333                  * child we can destroy the pool, which would result
334                  * in undefined behaviour in the
335                  * pthread_cond_destroy(pool->condvar). glibc just
336                  * blocks here.
337                  */
338                 pool->prefork_cond = &prefork_cond;
339
340                 ret = pthread_cond_signal(&pool->condvar);
341                 assert(ret == 0);
342
343                 while (pool->num_idle == num_idle) {
344                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
345                         assert(ret == 0);
346                 }
347
348                 pool->prefork_cond = NULL;
349
350                 ret = pthread_cond_destroy(&prefork_cond);
351                 assert(ret == 0);
352         }
353
354         /*
355          * Probably it's well-defined somewhere: What happens to
356          * condvars after a fork? The rationale of pthread_atfork only
357          * writes about mutexes. So better be safe than sorry and
358          * destroy/reinit pool->condvar across a fork.
359          */
360
361         ret = pthread_cond_destroy(&pool->condvar);
362         assert(ret == 0);
363 }
364
365 static void pthreadpool_prepare(void)
366 {
367         int ret;
368         struct pthreadpool *pool;
369
370         ret = pthread_mutex_lock(&pthreadpools_mutex);
371         assert(ret == 0);
372
373         pool = pthreadpools;
374
375         while (pool != NULL) {
376                 pthreadpool_prepare_pool(pool);
377                 pool = pool->next;
378         }
379 }
380
381 static void pthreadpool_parent(void)
382 {
383         int ret;
384         struct pthreadpool *pool;
385
386         for (pool = DLIST_TAIL(pthreadpools);
387              pool != NULL;
388              pool = DLIST_PREV(pool)) {
389                 ret = pthread_cond_init(&pool->condvar, NULL);
390                 assert(ret == 0);
391                 ret = pthread_mutex_unlock(&pool->mutex);
392                 assert(ret == 0);
393                 ret = pthread_mutex_unlock(&pool->fork_mutex);
394                 assert(ret == 0);
395         }
396
397         ret = pthread_mutex_unlock(&pthreadpools_mutex);
398         assert(ret == 0);
399 }
400
401 static void pthreadpool_child(void)
402 {
403         int ret;
404         struct pthreadpool *pool;
405
406         for (pool = DLIST_TAIL(pthreadpools);
407              pool != NULL;
408              pool = DLIST_PREV(pool)) {
409
410                 pool->num_threads = 0;
411                 pool->num_idle = 0;
412                 pool->head = 0;
413                 pool->num_jobs = 0;
414                 pool->stopped = true;
415                 if (pool->check_pipefd[0] != -1) {
416                         close(pool->check_pipefd[0]);
417                         pool->check_pipefd[0] = -1;
418                 }
419                 if (pool->check_pipefd[1] != -1) {
420                         close(pool->check_pipefd[1]);
421                         pool->check_pipefd[1] = -1;
422                 }
423
424                 ret = pthread_cond_init(&pool->condvar, NULL);
425                 assert(ret == 0);
426
427                 ret = pthread_mutex_unlock(&pool->mutex);
428                 assert(ret == 0);
429
430                 ret = pthread_mutex_unlock(&pool->fork_mutex);
431                 assert(ret == 0);
432         }
433
434         ret = pthread_mutex_unlock(&pthreadpools_mutex);
435         assert(ret == 0);
436 }
437
438 static void pthreadpool_prep_atfork(void)
439 {
440 #ifdef HAVE_UNSHARE_CLONE_FS
441         int res;
442
443         /* remember if unshare(CLONE_FS) works. */
444         res = unshare(CLONE_FS);
445         if (res == 0) {
446                 pthreadpool_support_thread_cwd = true;
447         }
448 #endif
449
450         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
451                        pthreadpool_child);
452 }
453
454 static int pthreadpool_free(struct pthreadpool *pool)
455 {
456         int ret, ret1, ret2;
457
458         ret = pthread_mutex_lock(&pthreadpools_mutex);
459         if (ret != 0) {
460                 return ret;
461         }
462         DLIST_REMOVE(pthreadpools, pool);
463         ret = pthread_mutex_unlock(&pthreadpools_mutex);
464         assert(ret == 0);
465
466         ret = pthread_mutex_lock(&pool->mutex);
467         assert(ret == 0);
468         ret = pthread_mutex_unlock(&pool->mutex);
469         assert(ret == 0);
470
471         ret = pthread_mutex_destroy(&pool->mutex);
472         ret1 = pthread_cond_destroy(&pool->condvar);
473         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
474
475         if (ret != 0) {
476                 return ret;
477         }
478         if (ret1 != 0) {
479                 return ret1;
480         }
481         if (ret2 != 0) {
482                 return ret2;
483         }
484
485         if (pool->check_pipefd[0] != -1) {
486                 close(pool->check_pipefd[0]);
487                 pool->check_pipefd[0] = -1;
488         }
489         if (pool->check_pipefd[1] != -1) {
490                 close(pool->check_pipefd[1]);
491                 pool->check_pipefd[1] = -1;
492         }
493         free(pool->jobs);
494         free(pool);
495
496         return 0;
497 }
498
499 /*
500  * Stop a thread pool. Wake up all idle threads for exit.
501  */
502
503 static int pthreadpool_stop_locked(struct pthreadpool *pool)
504 {
505         int ret;
506
507         pool->stopped = true;
508
509         if (pool->check_pipefd[0] != -1) {
510                 close(pool->check_pipefd[0]);
511                 pool->check_pipefd[0] = -1;
512         }
513         if (pool->check_pipefd[1] != -1) {
514                 close(pool->check_pipefd[1]);
515                 pool->check_pipefd[1] = -1;
516         }
517
518         if (pool->num_threads == 0) {
519                 return 0;
520         }
521
522         /*
523          * We have active threads, tell them to finish.
524          */
525
526         ret = pthread_cond_broadcast(&pool->condvar);
527
528         return ret;
529 }
530
531 /*
532  * Stop a thread pool. Wake up all idle threads for exit.
533  */
534
535 int pthreadpool_stop(struct pthreadpool *pool)
536 {
537         int ret, ret1;
538
539         ret = pthread_mutex_lock(&pool->mutex);
540         if (ret != 0) {
541                 return ret;
542         }
543
544         if (!pool->stopped) {
545                 ret = pthreadpool_stop_locked(pool);
546         }
547
548         ret1 = pthread_mutex_unlock(&pool->mutex);
549         assert(ret1 == 0);
550
551         return ret;
552 }
553
554 /*
555  * Destroy a thread pool. Wake up all idle threads for exit. The last
556  * one will free the pool.
557  */
558
559 int pthreadpool_destroy(struct pthreadpool *pool)
560 {
561         int ret, ret1;
562         bool free_it;
563
564         assert(!pool->destroyed);
565
566         ret = pthread_mutex_lock(&pool->mutex);
567         if (ret != 0) {
568                 return ret;
569         }
570
571         pool->destroyed = true;
572
573         if (!pool->stopped) {
574                 ret = pthreadpool_stop_locked(pool);
575         }
576
577         free_it = (pool->num_threads == 0);
578
579         ret1 = pthread_mutex_unlock(&pool->mutex);
580         assert(ret1 == 0);
581
582         if (free_it) {
583                 pthreadpool_free(pool);
584         }
585
586         return ret;
587 }
588 /*
589  * Prepare for pthread_exit(), pool->mutex must be locked and will be
590  * unlocked here. This is a bit of a layering violation, but here we
591  * also take care of removing the pool if we're the last thread.
592  */
593 static void pthreadpool_server_exit(struct pthreadpool *pool)
594 {
595         int ret;
596         bool free_it;
597
598         pool->num_threads -= 1;
599
600         free_it = (pool->destroyed && (pool->num_threads == 0));
601
602         while (true) {
603                 uint8_t c = 0;
604                 ssize_t nwritten = 0;
605
606                 if (pool->check_pipefd[1] == -1) {
607                         break;
608                 }
609
610                 nwritten = write(pool->check_pipefd[1], &c, 1);
611                 if (nwritten == -1) {
612                         if (errno == EINTR) {
613                                 continue;
614                         }
615                         if (errno == EAGAIN) {
616                                 break;
617                         }
618 #ifdef EWOULDBLOCK
619                         if (errno == EWOULDBLOCK) {
620                                 break;
621                         }
622 #endif
623                         /* ignore ... */
624                 }
625
626                 break;
627         }
628
629         ret = pthread_mutex_unlock(&pool->mutex);
630         assert(ret == 0);
631
632         if (free_it) {
633                 pthreadpool_free(pool);
634         }
635 }
636
637 static bool pthreadpool_get_job(struct pthreadpool *p,
638                                 struct pthreadpool_job *job)
639 {
640         if (p->stopped) {
641                 return false;
642         }
643
644         if (p->num_jobs == 0) {
645                 return false;
646         }
647         *job = p->jobs[p->head];
648         p->head = (p->head+1) % p->jobs_array_len;
649         p->num_jobs -= 1;
650         return true;
651 }
652
653 static bool pthreadpool_put_job(struct pthreadpool *p,
654                                 int id,
655                                 void (*fn)(void *private_data),
656                                 void *private_data)
657 {
658         struct pthreadpool_job *job;
659
660         if (p->num_jobs == p->jobs_array_len) {
661                 struct pthreadpool_job *tmp;
662                 size_t new_len = p->jobs_array_len * 2;
663
664                 tmp = realloc(
665                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
666                 if (tmp == NULL) {
667                         return false;
668                 }
669                 p->jobs = tmp;
670
671                 /*
672                  * We just doubled the jobs array. The array implements a FIFO
673                  * queue with a modulo-based wraparound, so we have to memcpy
674                  * the jobs that are logically at the queue end but physically
675                  * before the queue head into the reallocated area. The new
676                  * space starts at the current jobs_array_len, and we have to
677                  * copy everything before the current head job into the new
678                  * area.
679                  */
680                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
681                        sizeof(struct pthreadpool_job) * p->head);
682
683                 p->jobs_array_len = new_len;
684         }
685
686         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
687         job->id = id;
688         job->fn = fn;
689         job->private_data = private_data;
690
691         p->num_jobs += 1;
692
693         return true;
694 }
695
696 static void pthreadpool_undo_put_job(struct pthreadpool *p)
697 {
698         p->num_jobs -= 1;
699 }
700
701 static void *pthreadpool_server(void *arg)
702 {
703         struct pthreadpool *pool = (struct pthreadpool *)arg;
704         int res;
705
706 #ifdef HAVE_UNSHARE_CLONE_FS
707         if (pool->per_thread_cwd) {
708                 res = unshare(CLONE_FS);
709                 assert(res == 0);
710         }
711 #endif
712
713         res = pthread_mutex_lock(&pool->mutex);
714         if (res != 0) {
715                 return NULL;
716         }
717
718         while (1) {
719                 struct timespec ts;
720                 struct pthreadpool_job job;
721
722                 /*
723                  * idle-wait at most 1 second. If nothing happens in that
724                  * time, exit this thread.
725                  */
726
727                 clock_gettime(CLOCK_REALTIME, &ts);
728                 ts.tv_sec += 1;
729
730                 while ((pool->num_jobs == 0) && !pool->stopped) {
731
732                         pool->num_idle += 1;
733                         res = pthread_cond_timedwait(
734                                 &pool->condvar, &pool->mutex, &ts);
735                         pool->num_idle -= 1;
736
737                         if (pool->prefork_cond != NULL) {
738                                 /*
739                                  * Me must allow fork() to continue
740                                  * without anybody waiting on
741                                  * &pool->condvar. Tell
742                                  * pthreadpool_prepare_pool that we
743                                  * got that message.
744                                  */
745
746                                 res = pthread_cond_signal(pool->prefork_cond);
747                                 assert(res == 0);
748
749                                 res = pthread_mutex_unlock(&pool->mutex);
750                                 assert(res == 0);
751
752                                 /*
753                                  * pthreadpool_prepare_pool has
754                                  * already locked this mutex across
755                                  * the fork. This makes us wait
756                                  * without sitting in a condvar.
757                                  */
758                                 res = pthread_mutex_lock(&pool->fork_mutex);
759                                 assert(res == 0);
760                                 res = pthread_mutex_unlock(&pool->fork_mutex);
761                                 assert(res == 0);
762
763                                 res = pthread_mutex_lock(&pool->mutex);
764                                 assert(res == 0);
765                         }
766
767                         if (res == ETIMEDOUT) {
768
769                                 if (pool->num_jobs == 0) {
770                                         /*
771                                          * we timed out and still no work for
772                                          * us. Exit.
773                                          */
774                                         pthreadpool_server_exit(pool);
775                                         return NULL;
776                                 }
777
778                                 break;
779                         }
780                         assert(res == 0);
781                 }
782
783                 if (pthreadpool_get_job(pool, &job)) {
784                         int ret;
785
786                         /*
787                          * Do the work with the mutex unlocked
788                          */
789
790                         res = pthread_mutex_unlock(&pool->mutex);
791                         assert(res == 0);
792
793                         job.fn(job.private_data);
794
795                         ret = pool->signal_fn(job.id,
796                                               job.fn, job.private_data,
797                                               pool->signal_fn_private_data);
798
799                         res = pthread_mutex_lock(&pool->mutex);
800                         assert(res == 0);
801
802                         if (ret != 0) {
803                                 pthreadpool_server_exit(pool);
804                                 return NULL;
805                         }
806                 }
807
808                 if (pool->stopped) {
809                         /*
810                          * we're asked to stop processing jobs, so exit
811                          */
812                         pthreadpool_server_exit(pool);
813                         return NULL;
814                 }
815         }
816 }
817
818 static int pthreadpool_create_thread(struct pthreadpool *pool)
819 {
820         pthread_attr_t thread_attr;
821         pthread_t thread_id;
822         int res;
823         sigset_t mask, omask;
824
825         /*
826          * Create a new worker thread. It should not receive any signals.
827          */
828
829         sigfillset(&mask);
830
831         res = pthread_attr_init(&thread_attr);
832         if (res != 0) {
833                 return res;
834         }
835
836         res = pthread_attr_setdetachstate(
837                 &thread_attr, PTHREAD_CREATE_DETACHED);
838         if (res != 0) {
839                 pthread_attr_destroy(&thread_attr);
840                 return res;
841         }
842
843         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
844         if (res != 0) {
845                 pthread_attr_destroy(&thread_attr);
846                 return res;
847         }
848
849         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
850                              (void *)pool);
851
852         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
853
854         pthread_attr_destroy(&thread_attr);
855
856         if (res == 0) {
857                 pool->num_threads += 1;
858         }
859
860         return res;
861 }
862
863 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
864                         void (*fn)(void *private_data), void *private_data)
865 {
866         int res;
867         int unlock_res;
868
869         assert(!pool->destroyed);
870
871         res = pthread_mutex_lock(&pool->mutex);
872         if (res != 0) {
873                 return res;
874         }
875
876         if (pool->stopped) {
877                 /*
878                  * Protect against the pool being shut down while
879                  * trying to add a job
880                  */
881                 unlock_res = pthread_mutex_unlock(&pool->mutex);
882                 assert(unlock_res == 0);
883                 return EINVAL;
884         }
885
886         if (pool->max_threads == 0) {
887                 unlock_res = pthread_mutex_unlock(&pool->mutex);
888                 assert(unlock_res == 0);
889
890                 /*
891                  * If no thread are allowed we do strict sync processing.
892                  */
893                 fn(private_data);
894                 res = pool->signal_fn(job_id, fn, private_data,
895                                       pool->signal_fn_private_data);
896                 return res;
897         }
898
899         /*
900          * Add job to the end of the queue
901          */
902         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
903                 unlock_res = pthread_mutex_unlock(&pool->mutex);
904                 assert(unlock_res == 0);
905                 return ENOMEM;
906         }
907
908         if (pool->num_idle > 0) {
909                 /*
910                  * We have idle threads, wake one.
911                  */
912                 res = pthread_cond_signal(&pool->condvar);
913                 if (res != 0) {
914                         pthreadpool_undo_put_job(pool);
915                 }
916                 unlock_res = pthread_mutex_unlock(&pool->mutex);
917                 assert(unlock_res == 0);
918                 return res;
919         }
920
921         if (pool->num_threads >= pool->max_threads) {
922                 /*
923                  * No more new threads, we just queue the request
924                  */
925                 unlock_res = pthread_mutex_unlock(&pool->mutex);
926                 assert(unlock_res == 0);
927                 return 0;
928         }
929
930         res = pthreadpool_create_thread(pool);
931         if (res == 0) {
932                 unlock_res = pthread_mutex_unlock(&pool->mutex);
933                 assert(unlock_res == 0);
934                 return 0;
935         }
936
937         if (pool->num_threads != 0) {
938                 /*
939                  * At least one thread is still available, let
940                  * that one run the queued job.
941                  */
942                 unlock_res = pthread_mutex_unlock(&pool->mutex);
943                 assert(unlock_res == 0);
944                 return 0;
945         }
946
947         /*
948          * No thread could be created to run job, fallback to sync
949          * call.
950          */
951         pthreadpool_undo_put_job(pool);
952
953         unlock_res = pthread_mutex_unlock(&pool->mutex);
954         assert(unlock_res == 0);
955
956         return res;
957 }
958
959 int pthreadpool_restart_check(struct pthreadpool *pool)
960 {
961         int res;
962         int unlock_res;
963         unsigned possible_threads = 0;
964         unsigned missing_threads = 0;
965
966         assert(!pool->destroyed);
967
968         res = pthread_mutex_lock(&pool->mutex);
969         if (res != 0) {
970                 return res;
971         }
972
973         if (pool->stopped) {
974                 /*
975                  * Protect against the pool being shut down while
976                  * trying to add a job
977                  */
978                 unlock_res = pthread_mutex_unlock(&pool->mutex);
979                 assert(unlock_res == 0);
980                 return EINVAL;
981         }
982
983         if (pool->num_jobs == 0) {
984                 /*
985                  * This also handles the pool->max_threads == 0 case as it never
986                  * calls pthreadpool_put_job()
987                  */
988                 unlock_res = pthread_mutex_unlock(&pool->mutex);
989                 assert(unlock_res == 0);
990                 return 0;
991         }
992
993         if (pool->num_idle > 0) {
994                 /*
995                  * We have idle threads and pending jobs,
996                  * this means we better let all threads
997                  * start and check for pending jobs.
998                  */
999                 res = pthread_cond_broadcast(&pool->condvar);
1000                 assert(res == 0);
1001         }
1002
1003         if (pool->num_threads < pool->max_threads) {
1004                 possible_threads = pool->max_threads - pool->num_threads;
1005         }
1006
1007         if (pool->num_idle < pool->num_jobs) {
1008                 missing_threads = pool->num_jobs - pool->num_idle;
1009         }
1010
1011         missing_threads = MIN(missing_threads, possible_threads);
1012
1013         while (missing_threads > 0) {
1014
1015                 res = pthreadpool_create_thread(pool);
1016                 if (res != 0) {
1017                         break;
1018                 }
1019
1020                 missing_threads--;
1021         }
1022
1023         if (missing_threads == 0) {
1024                 /*
1025                  * Ok, we recreated all thread we need.
1026                  */
1027                 unlock_res = pthread_mutex_unlock(&pool->mutex);
1028                 assert(unlock_res == 0);
1029                 return 0;
1030         }
1031
1032         if (pool->num_threads != 0) {
1033                 /*
1034                  * At least one thread is still available, let
1035                  * that one run the queued jobs.
1036                  */
1037                 unlock_res = pthread_mutex_unlock(&pool->mutex);
1038                 assert(unlock_res == 0);
1039                 return 0;
1040         }
1041
1042         /*
1043          * There's no thread available to run any pending jobs.
1044          * The caller may want to cancel the jobs and destroy the pool.
1045          * But that's up to the caller.
1046          */
1047         unlock_res = pthread_mutex_unlock(&pool->mutex);
1048         assert(unlock_res == 0);
1049
1050         return res;
1051 }
1052
1053 int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
1054 {
1055         int fd;
1056         int ret;
1057         bool ok;
1058
1059         if (pool->stopped) {
1060                 errno = EINVAL;
1061                 return -1;
1062         }
1063
1064         if (pool->check_pipefd[0] == -1) {
1065                 errno = ENOSYS;
1066                 return -1;
1067         }
1068
1069         fd = dup(pool->check_pipefd[0]);
1070         if (fd == -1) {
1071                 return -1;
1072         }
1073
1074         ok = smb_set_close_on_exec(fd);
1075         if (!ok) {
1076                 int saved_errno = errno;
1077                 close(fd);
1078                 errno = saved_errno;
1079                 return -1;
1080         }
1081
1082         ret = set_blocking(fd, false);
1083         if (ret == -1) {
1084                 int saved_errno = errno;
1085                 close(fd);
1086                 errno = saved_errno;
1087                 return -1;
1088         }
1089
1090         return fd;
1091 }
1092
1093 int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
1094 {
1095         if (pool->stopped) {
1096                 return EINVAL;
1097         }
1098
1099         if (pool->check_pipefd[0] == -1) {
1100                 return ENOSYS;
1101         }
1102
1103         while (true) {
1104                 uint8_t buf[128];
1105                 ssize_t nread;
1106
1107                 nread = read(pool->check_pipefd[0], buf, sizeof(buf));
1108                 if (nread == -1) {
1109                         if (errno == EINTR) {
1110                                 continue;
1111                         }
1112                         if (errno == EAGAIN) {
1113                                 return 0;
1114                         }
1115 #ifdef EWOULDBLOCK
1116                         if (errno == EWOULDBLOCK) {
1117                                 return 0;
1118                         }
1119 #endif
1120                         if (errno == 0) {
1121                                 errno = INT_MAX;
1122                         }
1123
1124                         return errno;
1125                 }
1126
1127                 if (nread < sizeof(buf)) {
1128                         return 0;
1129                 }
1130         }
1131
1132         abort();
1133         return INT_MAX;
1134 }
1135
1136 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
1137                               void (*fn)(void *private_data), void *private_data)
1138 {
1139         int res;
1140         size_t i, j;
1141         size_t num = 0;
1142
1143         assert(!pool->destroyed);
1144
1145         res = pthread_mutex_lock(&pool->mutex);
1146         if (res != 0) {
1147                 return res;
1148         }
1149
1150         for (i = 0, j = 0; i < pool->num_jobs; i++) {
1151                 size_t idx = (pool->head + i) % pool->jobs_array_len;
1152                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
1153                 struct pthreadpool_job *job = &pool->jobs[idx];
1154
1155                 if ((job->private_data == private_data) &&
1156                     (job->id == job_id) &&
1157                     (job->fn == fn))
1158                 {
1159                         /*
1160                          * Just skip the entry.
1161                          */
1162                         num++;
1163                         continue;
1164                 }
1165
1166                 /*
1167                  * If we already removed one or more jobs (so j will be smaller
1168                  * then i), we need to fill possible gaps in the logical list.
1169                  */
1170                 if (j < i) {
1171                         pool->jobs[new_idx] = *job;
1172                 }
1173                 j++;
1174         }
1175
1176         pool->num_jobs -= num;
1177
1178         res = pthread_mutex_unlock(&pool->mutex);
1179         assert(res == 0);
1180
1181         return num;
1182 }