ldb: activating <= and >= indexing for integers
[samba.git] / lib / pthreadpool / pthreadpool_pipe.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "pthreadpool_pipe.h"
23 #include "pthreadpool.h"
24
25 struct pthreadpool_pipe {
26         struct pthreadpool *pool;
27         int num_jobs;
28         pid_t pid;
29         int pipe_fds[2];
30 };
31
32 static int pthreadpool_pipe_signal(int jobid,
33                                    void (*job_fn)(void *private_data),
34                                    void *job_private_data,
35                                    void *private_data);
36
37 int pthreadpool_pipe_init(unsigned max_threads,
38                           struct pthreadpool_pipe **presult)
39 {
40         struct pthreadpool_pipe *pool;
41         int ret;
42
43         pool = calloc(1, sizeof(struct pthreadpool_pipe));
44         if (pool == NULL) {
45                 return ENOMEM;
46         }
47         pool->pid = getpid();
48
49         ret = pipe(pool->pipe_fds);
50         if (ret == -1) {
51                 int err = errno;
52                 free(pool);
53                 return err;
54         }
55
56         ret = pthreadpool_init(max_threads, &pool->pool,
57                                pthreadpool_pipe_signal, pool);
58         if (ret != 0) {
59                 close(pool->pipe_fds[0]);
60                 close(pool->pipe_fds[1]);
61                 free(pool);
62                 return ret;
63         }
64
65         *presult = pool;
66         return 0;
67 }
68
69 static int pthreadpool_pipe_signal(int jobid,
70                                    void (*job_fn)(void *private_data),
71                                    void *job_private_data,
72                                    void *private_data)
73 {
74         struct pthreadpool_pipe *pool = private_data;
75         ssize_t written;
76
77         do {
78                 written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
79         } while ((written == -1) && (errno == EINTR));
80
81         if (written != sizeof(jobid)) {
82                 return errno;
83         }
84
85         return 0;
86 }
87
88 int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
89 {
90         int ret;
91
92         if (pool->num_jobs != 0) {
93                 return EBUSY;
94         }
95
96         ret = pthreadpool_destroy(pool->pool);
97         if (ret != 0) {
98                 return ret;
99         }
100
101         close(pool->pipe_fds[0]);
102         pool->pipe_fds[0] = -1;
103
104         close(pool->pipe_fds[1]);
105         pool->pipe_fds[1] = -1;
106
107         free(pool);
108         return 0;
109 }
110
111 static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
112 {
113         pid_t pid = getpid();
114         int signal_fd;
115         int ret;
116
117         if (pid == pool->pid) {
118                 return 0;
119         }
120
121         signal_fd = pool->pipe_fds[0];
122
123         close(pool->pipe_fds[0]);
124         pool->pipe_fds[0] = -1;
125
126         close(pool->pipe_fds[1]);
127         pool->pipe_fds[1] = -1;
128
129         ret = pipe(pool->pipe_fds);
130         if (ret != 0) {
131                 return errno;
132         }
133
134         ret = dup2(pool->pipe_fds[0], signal_fd);
135         if (ret != 0) {
136                 return errno;
137         }
138
139         pool->pipe_fds[0] = signal_fd;
140         pool->num_jobs = 0;
141
142         return 0;
143 }
144
145 int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
146                              void (*fn)(void *private_data),
147                              void *private_data)
148 {
149         int ret;
150
151         ret = pthreadpool_pipe_reinit(pool);
152         if (ret != 0) {
153                 return ret;
154         }
155
156         ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
157         if (ret != 0) {
158                 return ret;
159         }
160
161         pool->num_jobs += 1;
162
163         return 0;
164 }
165
166 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
167 {
168         return pool->pipe_fds[0];
169 }
170
171 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
172                                    unsigned num_jobids)
173 {
174         ssize_t to_read, nread, num_jobs;
175         pid_t pid = getpid();
176
177         if (pool->pid != pid) {
178                 return EINVAL;
179         }
180
181         to_read = sizeof(int) * num_jobids;
182
183         do {
184                 nread = read(pool->pipe_fds[0], jobids, to_read);
185         } while ((nread == -1) && (errno == EINTR));
186
187         if (nread == -1) {
188                 return -errno;
189         }
190         if ((nread % sizeof(int)) != 0) {
191                 return -EINVAL;
192         }
193
194         num_jobs = nread / sizeof(int);
195
196         if (num_jobs > pool->num_jobs) {
197                 return -EINVAL;
198         }
199         pool->num_jobs -= num_jobs;
200
201         return num_jobs;
202 }