7f370da9b8f1ec9873319f28cb5134da6b0323ae
[nivanova/samba-autobuild/.git] / ctdb / common / comm.c
1 /*
2    Communication endpoint implementation
3
4    Copyright (C) Amitay Isaacs 2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tdb.h>
26
27 #include "lib/util/blocking.h"
28 #include "lib/util/tevent_unix.h"
29
30 #include "pkt_read.h"
31 #include "pkt_write.h"
32 #include "comm.h"
33
34 /*
35  * Communication endpoint around a socket
36  */
37
38 #define SMALL_PKT_SIZE  1024
39
40 struct comm_context {
41         int fd;
42         comm_read_handler_fn read_handler;
43         void *read_private_data;
44         comm_dead_handler_fn dead_handler;
45         void *dead_private_data;
46         uint8_t small_pkt[SMALL_PKT_SIZE];
47         struct tevent_req *read_req, *write_req;
48         struct tevent_fd *fde;
49         struct tevent_queue *queue;
50 };
51
52 static void comm_fd_handler(struct tevent_context *ev,
53                             struct tevent_fd *fde,
54                             uint16_t flags, void *private_data);
55 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
56                                          struct tevent_context *ev,
57                                          struct comm_context *comm,
58                                          uint8_t *buf, size_t buflen);
59 static void comm_read_failed(struct tevent_req *req);
60
61
62 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
63                comm_read_handler_fn read_handler, void *read_private_data,
64                comm_dead_handler_fn dead_handler, void *dead_private_data,
65                struct comm_context **result)
66 {
67         struct comm_context *comm;
68         int ret;
69
70         if (fd < 0) {
71                 return EINVAL;
72         }
73
74         if (dead_handler == NULL) {
75                 return EINVAL;
76         }
77
78         /* Socket queue relies on non-blocking sockets. */
79         ret = set_blocking(fd, false);
80         if (ret == -1) {
81                 return EIO;
82         }
83
84         comm = talloc_zero(mem_ctx, struct comm_context);
85         if (comm == NULL) {
86                 return ENOMEM;
87         }
88
89         comm->fd = fd;
90         comm->read_handler = read_handler;
91         comm->read_private_data = read_private_data;
92         comm->dead_handler = dead_handler;
93         comm->dead_private_data = dead_private_data;
94
95         comm->queue = tevent_queue_create(comm, "comm write queue");
96         if (comm->queue == NULL) {
97                 goto fail;
98         }
99
100         /* Set up to write packets */
101         comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
102                                   comm_fd_handler, comm);
103         if (comm->fde == NULL) {
104                 goto fail;
105         }
106
107         /* Set up to read packets */
108         if (read_handler != NULL) {
109                 struct tevent_req *req;
110
111                 req = comm_read_send(comm, ev, comm, comm->small_pkt,
112                                      SMALL_PKT_SIZE);
113                 if (req == NULL) {
114                         goto fail;
115                 }
116
117                 tevent_req_set_callback(req, comm_read_failed, comm);
118                 comm->read_req = req;
119         }
120
121         *result = comm;
122         return 0;
123
124 fail:
125         talloc_free(comm);
126         return ENOMEM;
127 }
128
129
130 /*
131  * Read packets
132  */
133
134 struct comm_read_state {
135         struct tevent_context *ev;
136         struct comm_context *comm;
137         uint8_t *buf;
138         size_t buflen;
139         struct tevent_req *subreq;
140 };
141
142 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
143 static void comm_read_done(struct tevent_req *subreq);
144
145 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
146                                          struct tevent_context *ev,
147                                          struct comm_context *comm,
148                                          uint8_t *buf, size_t buflen)
149 {
150         struct tevent_req *req, *subreq;
151         struct comm_read_state *state;
152
153         req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
154         if (req == NULL) {
155                 return NULL;
156         }
157
158         state->ev = ev;
159         state->comm = comm;
160         state->buf = buf;
161         state->buflen = buflen;
162
163         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
164                                state->buf, state->buflen,
165                                comm_read_more, NULL);
166         if (tevent_req_nomem(subreq, req)) {
167                 return tevent_req_post(req, ev);
168         }
169         state->subreq = subreq;
170
171         tevent_req_set_callback(subreq, comm_read_done, req);
172         return req;
173 }
174
175 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
176 {
177         uint32_t packet_len;
178
179         if (buflen < sizeof(uint32_t)) {
180                 return sizeof(uint32_t) - buflen;
181         }
182
183         packet_len = *(uint32_t *)buf;
184
185         return packet_len - buflen;
186 }
187
188 static void comm_read_done(struct tevent_req *subreq)
189 {
190         struct tevent_req *req = tevent_req_callback_data(
191                 subreq, struct tevent_req);
192         struct comm_read_state *state = tevent_req_data(
193                 req, struct comm_read_state);
194         struct comm_context *comm = state->comm;
195         ssize_t nread;
196         uint8_t *buf;
197         bool free_buf;
198         int err = 0;
199
200         nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
201         TALLOC_FREE(subreq);
202         state->subreq = NULL;
203         if (nread == -1) {
204                 tevent_req_error(req, err);
205                 return;
206         }
207
208         comm->read_handler(buf, nread, comm->read_private_data);
209
210         if (free_buf) {
211                 talloc_free(buf);
212         }
213
214         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
215                                state->buf, state->buflen,
216                                comm_read_more, NULL);
217         if (tevent_req_nomem(subreq, req)) {
218                 return;
219         }
220         state->subreq = subreq;
221
222         tevent_req_set_callback(subreq, comm_read_done, req);
223 }
224
225 static void comm_read_recv(struct tevent_req *req, int *perr)
226 {
227         int err;
228
229         if (tevent_req_is_unix_error(req, &err)) {
230                 if (perr != NULL) {
231                         *perr = err;
232                 }
233         }
234 }
235
236 static void comm_read_failed(struct tevent_req *req)
237 {
238         struct comm_context *comm = tevent_req_callback_data(
239                 req, struct comm_context);
240
241         comm_read_recv(req, NULL);
242         TALLOC_FREE(req);
243         comm->read_req = NULL;
244         if (comm->dead_handler != NULL) {
245                 comm->dead_handler(comm->dead_private_data);
246         }
247 }
248
249
250 /*
251  * Write packets
252  */
253
254 struct comm_write_state {
255         struct tevent_context *ev;
256         struct comm_context *comm;
257         struct tevent_req *subreq;
258         uint8_t *buf;
259         size_t buflen, nwritten;
260 };
261
262 static void comm_write_trigger(struct tevent_req *req, void *private_data);
263 static void comm_write_done(struct tevent_req *subreq);
264
265 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
266                                    struct tevent_context *ev,
267                                    struct comm_context *comm,
268                                    uint8_t *buf, size_t buflen)
269 {
270         struct tevent_req *req;
271         struct comm_write_state *state;
272
273         req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
274         if (req == NULL) {
275                 return NULL;
276         }
277
278         state->ev = ev;
279         state->comm = comm;
280         state->buf = buf;
281         state->buflen = buflen;
282
283         if (!tevent_queue_add_entry(comm->queue, ev, req,
284                                     comm_write_trigger, NULL)) {
285                 talloc_free(req);
286                 return NULL;
287         }
288
289         return req;
290 }
291
292 static void comm_write_trigger(struct tevent_req *req, void *private_data)
293 {
294         struct comm_write_state *state = tevent_req_data(
295                 req, struct comm_write_state);
296         struct comm_context *comm = state->comm;
297         struct tevent_req *subreq;
298
299         comm->write_req = req;
300
301         subreq = pkt_write_send(state, state->ev, comm->fd,
302                                 state->buf, state->buflen);
303         if (tevent_req_nomem(subreq, req)) {
304                 return;
305         }
306
307         state->subreq = subreq;
308         tevent_req_set_callback(subreq, comm_write_done, req);
309         TEVENT_FD_WRITEABLE(comm->fde);
310 }
311
312 static void comm_write_done(struct tevent_req *subreq)
313 {
314         struct tevent_req *req = tevent_req_callback_data(
315                 subreq, struct tevent_req);
316         struct comm_write_state *state = tevent_req_data(
317                 req, struct comm_write_state);
318         struct comm_context *comm = state->comm;
319         ssize_t nwritten;
320         int err = 0;
321
322         TEVENT_FD_NOT_WRITEABLE(comm->fde);
323         nwritten = pkt_write_recv(subreq, &err);
324         TALLOC_FREE(subreq);
325         state->subreq = NULL;
326         comm->write_req = NULL;
327         if (nwritten == -1) {
328                 if (err == EPIPE) {
329                         comm->dead_handler(comm->dead_private_data);
330                 }
331                 tevent_req_error(req, err);
332                 return;
333         }
334
335         state->nwritten = nwritten;
336         tevent_req_done(req);
337 }
338
339 bool comm_write_recv(struct tevent_req *req, int *perr)
340 {
341         struct comm_write_state *state = tevent_req_data(
342                 req, struct comm_write_state);
343         int err;
344
345         if (tevent_req_is_unix_error(req, &err)) {
346                 if (perr != NULL) {
347                         *perr = err;
348                 }
349                 return false;
350         }
351
352         if (state->nwritten != state->buflen) {
353                 *perr = EIO;
354                 return false;
355         }
356
357         *perr = 0;
358         return true;
359 }
360
361 static void comm_fd_handler(struct tevent_context *ev,
362                             struct tevent_fd *fde,
363                             uint16_t flags, void *private_data)
364 {
365         struct comm_context *comm = talloc_get_type_abort(
366                 private_data, struct comm_context);
367
368         if (flags & TEVENT_FD_READ) {
369                 struct comm_read_state *read_state;
370
371                 if (comm->read_req == NULL) {
372                         /* This should never happen */
373                         abort();
374                 }
375
376                 read_state = tevent_req_data(comm->read_req,
377                                              struct comm_read_state);
378                 pkt_read_handler(ev, fde, flags, read_state->subreq);
379         }
380
381         if (flags & TEVENT_FD_WRITE) {
382                 struct comm_write_state *write_state;
383
384                 if (comm->write_req == NULL) {
385                         /* This should never happen */
386                         abort();
387                 }
388
389                 write_state = tevent_req_data(comm->write_req,
390                                               struct comm_write_state);
391                 pkt_write_handler(ev, fde, flags, write_state->subreq);
392         }
393 }