ctdb-common: Allow path_socket() to use $CTDB_SOCKET
[vlendec/samba-autobuild/.git] / ctdb / common / comm.c
1 /*
2    Communication endpoint implementation
3
4    Copyright (C) Amitay Isaacs 2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tdb.h>
26
27 #include "lib/util/blocking.h"
28 #include "lib/util/tevent_unix.h"
29
30 #include "pkt_read.h"
31 #include "pkt_write.h"
32 #include "comm.h"
33
34 /*
35  * Communication endpoint around a socket
36  */
37
38 #define SMALL_PKT_SIZE  1024
39
40 struct comm_context {
41         int fd;
42         comm_read_handler_fn read_handler;
43         void *read_private_data;
44         comm_dead_handler_fn dead_handler;
45         void *dead_private_data;
46         uint8_t small_pkt[SMALL_PKT_SIZE];
47         struct tevent_req *read_req, *write_req;
48         struct tevent_fd *fde;
49         struct tevent_queue *queue;
50 };
51
52 static void comm_fd_handler(struct tevent_context *ev,
53                             struct tevent_fd *fde,
54                             uint16_t flags, void *private_data);
55 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
56                                          struct tevent_context *ev,
57                                          struct comm_context *comm,
58                                          uint8_t *buf, size_t buflen);
59 static void comm_read_failed(struct tevent_req *req);
60
61
62 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
63                comm_read_handler_fn read_handler, void *read_private_data,
64                comm_dead_handler_fn dead_handler, void *dead_private_data,
65                struct comm_context **result)
66 {
67         struct comm_context *comm;
68         int ret;
69
70         if (fd < 0) {
71                 return EINVAL;
72         }
73
74         if (dead_handler == NULL) {
75                 return EINVAL;
76         }
77
78         /* Socket queue relies on non-blocking sockets. */
79         ret = set_blocking(fd, false);
80         if (ret == -1) {
81                 return EIO;
82         }
83
84         comm = talloc_zero(mem_ctx, struct comm_context);
85         if (comm == NULL) {
86                 return ENOMEM;
87         }
88
89         comm->fd = fd;
90         comm->read_handler = read_handler;
91         comm->read_private_data = read_private_data;
92         comm->dead_handler = dead_handler;
93         comm->dead_private_data = dead_private_data;
94
95         comm->queue = tevent_queue_create(comm, "comm write queue");
96         if (comm->queue == NULL) {
97                 goto fail;
98         }
99
100         /* Set up to write packets */
101         comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
102                                   comm_fd_handler, comm);
103         if (comm->fde == NULL) {
104                 goto fail;
105         }
106
107         /* Set up to read packets */
108         if (read_handler != NULL) {
109                 struct tevent_req *req;
110
111                 req = comm_read_send(comm, ev, comm, comm->small_pkt,
112                                      SMALL_PKT_SIZE);
113                 if (req == NULL) {
114                         goto fail;
115                 }
116
117                 tevent_req_set_callback(req, comm_read_failed, comm);
118                 comm->read_req = req;
119         }
120
121         *result = comm;
122         return 0;
123
124 fail:
125         talloc_free(comm);
126         return ENOMEM;
127 }
128
129
130 /*
131  * Read packets
132  */
133
134 struct comm_read_state {
135         struct tevent_context *ev;
136         struct comm_context *comm;
137         uint8_t *buf;
138         size_t buflen;
139         struct tevent_req *subreq;
140 };
141
142 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
143 static void comm_read_done(struct tevent_req *subreq);
144
145 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
146                                          struct tevent_context *ev,
147                                          struct comm_context *comm,
148                                          uint8_t *buf, size_t buflen)
149 {
150         struct tevent_req *req, *subreq;
151         struct comm_read_state *state;
152
153         req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
154         if (req == NULL) {
155                 return NULL;
156         }
157
158         state->ev = ev;
159         state->comm = comm;
160         state->buf = buf;
161         state->buflen = buflen;
162
163         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
164                                state->buf, state->buflen,
165                                comm_read_more, NULL);
166         if (tevent_req_nomem(subreq, req)) {
167                 return tevent_req_post(req, ev);
168         }
169         state->subreq = subreq;
170
171         tevent_req_set_callback(subreq, comm_read_done, req);
172         return req;
173 }
174
175 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
176 {
177         uint32_t packet_len;
178
179         if (buflen < sizeof(uint32_t)) {
180                 return sizeof(uint32_t) - buflen;
181         }
182
183         packet_len = *(uint32_t *)buf;
184
185         return packet_len - buflen;
186 }
187
188 static void comm_read_done(struct tevent_req *subreq)
189 {
190         struct tevent_req *req = tevent_req_callback_data(
191                 subreq, struct tevent_req);
192         struct comm_read_state *state = tevent_req_data(
193                 req, struct comm_read_state);
194         struct comm_context *comm = state->comm;
195         ssize_t nread;
196         uint8_t *buf;
197         bool free_buf;
198         int err = 0;
199
200         nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
201         TALLOC_FREE(subreq);
202         state->subreq = NULL;
203         if (nread == -1) {
204                 tevent_req_error(req, err);
205                 return;
206         }
207
208         comm->read_handler(buf, nread, comm->read_private_data);
209
210         if (free_buf) {
211                 talloc_free(buf);
212         }
213
214         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
215                                state->buf, state->buflen,
216                                comm_read_more, NULL);
217         if (tevent_req_nomem(subreq, req)) {
218                 return;
219         }
220         state->subreq = subreq;
221
222         tevent_req_set_callback(subreq, comm_read_done, req);
223 }
224
225 static void comm_read_recv(struct tevent_req *req, int *perr)
226 {
227         int err;
228
229         if (tevent_req_is_unix_error(req, &err)) {
230                 if (perr != NULL) {
231                         *perr = err;
232                 }
233         }
234 }
235
236 static void comm_read_failed(struct tevent_req *req)
237 {
238         struct comm_context *comm = tevent_req_callback_data(
239                 req, struct comm_context);
240
241         comm_read_recv(req, NULL);
242         TALLOC_FREE(req);
243         comm->read_req = NULL;
244         if (comm->dead_handler != NULL) {
245                 comm->dead_handler(comm->dead_private_data);
246         }
247 }
248
249
250 /*
251  * Write packets
252  */
253
254 struct comm_write_entry {
255         struct comm_context *comm;
256         struct tevent_queue_entry *qentry;
257         struct tevent_req *req;
258 };
259
260 struct comm_write_state {
261         struct tevent_context *ev;
262         struct comm_context *comm;
263         struct comm_write_entry *entry;
264         struct tevent_req *subreq;
265         uint8_t *buf;
266         size_t buflen, nwritten;
267 };
268
269 static int comm_write_entry_destructor(struct comm_write_entry *entry);
270 static void comm_write_trigger(struct tevent_req *req, void *private_data);
271 static void comm_write_done(struct tevent_req *subreq);
272
273 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
274                                    struct tevent_context *ev,
275                                    struct comm_context *comm,
276                                    uint8_t *buf, size_t buflen)
277 {
278         struct tevent_req *req;
279         struct comm_write_state *state;
280         struct comm_write_entry *entry;
281
282         req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
283         if (req == NULL) {
284                 return NULL;
285         }
286
287         state->ev = ev;
288         state->comm = comm;
289         state->buf = buf;
290         state->buflen = buflen;
291
292         entry = talloc_zero(state, struct comm_write_entry);
293         if (tevent_req_nomem(entry, req)) {
294                 return tevent_req_post(req, ev);
295         }
296
297         entry->comm = comm;
298         entry->req = req;
299         entry->qentry = tevent_queue_add_entry(comm->queue, ev, req,
300                                                comm_write_trigger, NULL);
301         if (tevent_req_nomem(entry->qentry, req)) {
302                 return tevent_req_post(req, ev);
303         }
304
305         state->entry = entry;
306         talloc_set_destructor(entry, comm_write_entry_destructor);
307
308         return req;
309 }
310
311 static int comm_write_entry_destructor(struct comm_write_entry *entry)
312 {
313         struct comm_context *comm = entry->comm;
314
315         if (comm->write_req == entry->req) {
316                 comm->write_req = NULL;
317                 TEVENT_FD_NOT_WRITEABLE(comm->fde);
318         }
319
320         TALLOC_FREE(entry->qentry);
321         return 0;
322 }
323
324 static void comm_write_trigger(struct tevent_req *req, void *private_data)
325 {
326         struct comm_write_state *state = tevent_req_data(
327                 req, struct comm_write_state);
328         struct comm_context *comm = state->comm;
329         struct tevent_req *subreq;
330
331         comm->write_req = req;
332
333         subreq = pkt_write_send(state, state->ev, comm->fd,
334                                 state->buf, state->buflen);
335         if (tevent_req_nomem(subreq, req)) {
336                 return;
337         }
338
339         state->subreq = subreq;
340         tevent_req_set_callback(subreq, comm_write_done, req);
341         TEVENT_FD_WRITEABLE(comm->fde);
342 }
343
344 static void comm_write_done(struct tevent_req *subreq)
345 {
346         struct tevent_req *req = tevent_req_callback_data(
347                 subreq, struct tevent_req);
348         struct comm_write_state *state = tevent_req_data(
349                 req, struct comm_write_state);
350         struct comm_context *comm = state->comm;
351         ssize_t nwritten;
352         int err = 0;
353
354         TEVENT_FD_NOT_WRITEABLE(comm->fde);
355         nwritten = pkt_write_recv(subreq, &err);
356         TALLOC_FREE(subreq);
357         state->subreq = NULL;
358         comm->write_req = NULL;
359         if (nwritten == -1) {
360                 if (err == EPIPE) {
361                         comm->dead_handler(comm->dead_private_data);
362                 }
363                 tevent_req_error(req, err);
364                 return;
365         }
366
367         state->nwritten = nwritten;
368         state->entry->qentry = NULL;
369         TALLOC_FREE(state->entry);
370         tevent_req_done(req);
371 }
372
373 bool comm_write_recv(struct tevent_req *req, int *perr)
374 {
375         struct comm_write_state *state = tevent_req_data(
376                 req, struct comm_write_state);
377         int err;
378
379         if (tevent_req_is_unix_error(req, &err)) {
380                 if (perr != NULL) {
381                         *perr = err;
382                 }
383                 return false;
384         }
385
386         if (state->nwritten != state->buflen) {
387                 *perr = EIO;
388                 return false;
389         }
390
391         *perr = 0;
392         return true;
393 }
394
395 static void comm_fd_handler(struct tevent_context *ev,
396                             struct tevent_fd *fde,
397                             uint16_t flags, void *private_data)
398 {
399         struct comm_context *comm = talloc_get_type_abort(
400                 private_data, struct comm_context);
401
402         if (flags & TEVENT_FD_READ) {
403                 struct comm_read_state *read_state;
404
405                 if (comm->read_req == NULL) {
406                         /* This should never happen */
407                         abort();
408                 }
409
410                 read_state = tevent_req_data(comm->read_req,
411                                              struct comm_read_state);
412                 pkt_read_handler(ev, fde, flags, read_state->subreq);
413         }
414
415         if (flags & TEVENT_FD_WRITE) {
416                 struct comm_write_state *write_state;
417
418                 if (comm->write_req == NULL) {
419                         TEVENT_FD_NOT_WRITEABLE(comm->fde);
420                         return;
421                 }
422
423                 write_state = tevent_req_data(comm->write_req,
424                                               struct comm_write_state);
425                 pkt_write_handler(ev, fde, flags, write_state->subreq);
426         }
427 }