2 * Unix SMB/CIFS implementation.
3 * threadpool implementation based on pthreads
4 * Copyright (C) Volker Lendecke 2009,2011
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "pthreadpool_pipe.h"
23 #include "pthreadpool.h"
25 struct pthreadpool_pipe {
26 struct pthreadpool *pool;
32 static int pthreadpool_pipe_signal(int jobid,
33 void (*job_fn)(void *private_data),
34 void *job_private_data,
37 int pthreadpool_pipe_init(unsigned max_threads,
38 struct pthreadpool_pipe **presult)
40 struct pthreadpool_pipe *pool;
43 pool = calloc(1, sizeof(struct pthreadpool_pipe));
49 ret = pipe(pool->pipe_fds);
56 ret = pthreadpool_init(max_threads, &pool->pool,
57 pthreadpool_pipe_signal, pool);
59 close(pool->pipe_fds[0]);
60 close(pool->pipe_fds[1]);
69 static int pthreadpool_pipe_signal(int jobid,
70 void (*job_fn)(void *private_data),
71 void *job_private_data,
74 struct pthreadpool_pipe *pool = private_data;
78 written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
79 } while ((written == -1) && (errno == EINTR));
81 if (written != sizeof(jobid)) {
88 int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
92 if (pool->num_jobs != 0) {
96 ret = pthreadpool_destroy(pool->pool);
101 close(pool->pipe_fds[0]);
102 pool->pipe_fds[0] = -1;
104 close(pool->pipe_fds[1]);
105 pool->pipe_fds[1] = -1;
111 static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
113 pid_t pid = getpid();
117 if (pid == pool->pid) {
121 signal_fd = pool->pipe_fds[0];
123 close(pool->pipe_fds[0]);
124 pool->pipe_fds[0] = -1;
126 close(pool->pipe_fds[1]);
127 pool->pipe_fds[1] = -1;
129 ret = pipe(pool->pipe_fds);
134 ret = dup2(pool->pipe_fds[0], signal_fd);
139 pool->pipe_fds[0] = signal_fd;
145 int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
146 void (*fn)(void *private_data),
151 ret = pthreadpool_pipe_reinit(pool);
156 ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
166 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
168 return pool->pipe_fds[0];
171 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
174 ssize_t to_read, nread, num_jobs;
175 pid_t pid = getpid();
177 if (pool->pid != pid) {
181 to_read = sizeof(int) * num_jobids;
184 nread = read(pool->pipe_fds[0], jobids, to_read);
185 } while ((nread == -1) && (errno == EINTR));
190 if ((nread % sizeof(int)) != 0) {
194 num_jobs = nread / sizeof(int);
196 if (num_jobs > pool->num_jobs) {
199 pool->num_jobs -= num_jobs;