ctdb-tests: Fix some incorrect memory allocations
[obnox/samba/samba-obnox.git] / ctdb / common / comm.c
1 /*
2    Communication endpoint implementation
3
4    Copyright (C) Amitay Isaacs 2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tdb.h>
26
27 #include "lib/util/tevent_unix.h"
28
29 #include "pkt_read.h"
30 #include "pkt_write.h"
31 #include "comm.h"
32
33 static bool set_nonblocking(int fd)
34 {
35         int v;
36
37         v = fcntl(fd, F_GETFL, 0);
38         if (v == -1) {
39                 return false;
40         }
41         if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
42                 return false;
43         }
44         return true;
45 }
46
47 /*
48  * Communication endpoint around a socket
49  */
50
51 #define SMALL_PKT_SIZE  1024
52
53 struct comm_context {
54         int fd;
55         comm_read_handler_fn read_handler;
56         void *read_private_data;
57         comm_dead_handler_fn dead_handler;
58         void *dead_private_data;
59         uint8_t small_pkt[SMALL_PKT_SIZE];
60         struct tevent_req *read_req, *write_req;
61         struct tevent_fd *fde;
62         struct tevent_queue *queue;
63 };
64
65 static void comm_fd_handler(struct tevent_context *ev,
66                             struct tevent_fd *fde,
67                             uint16_t flags, void *private_data);
68 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
69                                          struct tevent_context *ev,
70                                          struct comm_context *comm,
71                                          uint8_t *buf, size_t buflen);
72 static void comm_read_failed(struct tevent_req *req);
73
74
75 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
76                comm_read_handler_fn read_handler, void *read_private_data,
77                comm_dead_handler_fn dead_handler, void *dead_private_data,
78                struct comm_context **result)
79 {
80         struct comm_context *comm;
81
82         if (fd < 0) {
83                 return EINVAL;
84         }
85
86         if (dead_handler == NULL) {
87                 return EINVAL;
88         }
89
90         /* Socket queue relies on non-blocking sockets. */
91         if (!set_nonblocking(fd)) {
92                 return EIO;
93         }
94
95         comm = talloc_zero(mem_ctx, struct comm_context);
96         if (comm == NULL) {
97                 return ENOMEM;
98         }
99
100         comm->fd = fd;
101         comm->read_handler = read_handler;
102         comm->read_private_data = read_private_data;
103         comm->dead_handler = dead_handler;
104         comm->dead_private_data = dead_private_data;
105
106         comm->queue = tevent_queue_create(comm, "comm write queue");
107         if (comm->queue == NULL) {
108                 goto fail;
109         }
110
111         /* Set up to write packets */
112         comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
113                                   comm_fd_handler, comm);
114         if (comm->fde == NULL) {
115                 goto fail;
116         }
117
118         /* Set up to read packets */
119         if (read_handler != NULL) {
120                 struct tevent_req *req;
121
122                 req = comm_read_send(comm, ev, comm, comm->small_pkt,
123                                      SMALL_PKT_SIZE);
124                 if (req == NULL) {
125                         goto fail;
126                 }
127
128                 tevent_req_set_callback(req, comm_read_failed, comm);
129                 comm->read_req = req;
130         }
131
132         *result = comm;
133         return 0;
134
135 fail:
136         talloc_free(comm);
137         return ENOMEM;
138 }
139
140
141 /*
142  * Read packets
143  */
144
145 struct comm_read_state {
146         struct tevent_context *ev;
147         struct comm_context *comm;
148         uint8_t *buf;
149         size_t buflen;
150         struct tevent_req *subreq;
151 };
152
153 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
154 static void comm_read_done(struct tevent_req *subreq);
155
156 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
157                                          struct tevent_context *ev,
158                                          struct comm_context *comm,
159                                          uint8_t *buf, size_t buflen)
160 {
161         struct tevent_req *req, *subreq;
162         struct comm_read_state *state;
163
164         req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
165         if (req == NULL) {
166                 return NULL;
167         }
168
169         state->ev = ev;
170         state->comm = comm;
171         state->buf = buf;
172         state->buflen = buflen;
173
174         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
175                                state->buf, state->buflen,
176                                comm_read_more, NULL);
177         if (tevent_req_nomem(subreq, req)) {
178                 return tevent_req_post(req, ev);
179         }
180         state->subreq = subreq;
181
182         tevent_req_set_callback(subreq, comm_read_done, req);
183         return req;
184 }
185
186 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
187 {
188         uint32_t packet_len;
189
190         if (buflen < sizeof(uint32_t)) {
191                 return sizeof(uint32_t) - buflen;
192         }
193
194         packet_len = *(uint32_t *)buf;
195
196         return packet_len - buflen;
197 }
198
199 static void comm_read_done(struct tevent_req *subreq)
200 {
201         struct tevent_req *req = tevent_req_callback_data(
202                 subreq, struct tevent_req);
203         struct comm_read_state *state = tevent_req_data(
204                 req, struct comm_read_state);
205         struct comm_context *comm = state->comm;
206         ssize_t nread;
207         uint8_t *buf;
208         bool free_buf;
209         int err = 0;
210
211         nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
212         TALLOC_FREE(subreq);
213         state->subreq = NULL;
214         if (nread == -1) {
215                 tevent_req_error(req, err);
216                 return;
217         }
218
219         comm->read_handler(buf, nread, comm->read_private_data);
220
221         if (free_buf) {
222                 talloc_free(buf);
223         }
224
225         subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
226                                state->buf, state->buflen,
227                                comm_read_more, NULL);
228         if (tevent_req_nomem(subreq, req)) {
229                 return;
230         }
231         state->subreq = subreq;
232
233         tevent_req_set_callback(subreq, comm_read_done, req);
234 }
235
236 static void comm_read_recv(struct tevent_req *req, int *perr)
237 {
238         int err;
239
240         if (tevent_req_is_unix_error(req, &err)) {
241                 if (perr != NULL) {
242                         *perr = err;
243                 }
244         }
245 }
246
247 static void comm_read_failed(struct tevent_req *req)
248 {
249         struct comm_context *comm = tevent_req_callback_data(
250                 req, struct comm_context);
251
252         comm_read_recv(req, NULL);
253         TALLOC_FREE(req);
254         comm->read_req = NULL;
255         if (comm->dead_handler != NULL) {
256                 comm->dead_handler(comm->dead_private_data);
257         }
258 }
259
260
261 /*
262  * Write packets
263  */
264
265 struct comm_write_state {
266         struct tevent_context *ev;
267         struct comm_context *comm;
268         struct tevent_req *subreq;
269         uint8_t *buf;
270         size_t buflen, nwritten;
271 };
272
273 static void comm_write_trigger(struct tevent_req *req, void *private_data);
274 static void comm_write_done(struct tevent_req *subreq);
275
276 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
277                                    struct tevent_context *ev,
278                                    struct comm_context *comm,
279                                    uint8_t *buf, size_t buflen)
280 {
281         struct tevent_req *req;
282         struct comm_write_state *state;
283
284         req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
285         if (req == NULL) {
286                 return NULL;
287         }
288
289         state->ev = ev;
290         state->comm = comm;
291         state->buf = buf;
292         state->buflen = buflen;
293
294         if (!tevent_queue_add_entry(comm->queue, ev, req,
295                                     comm_write_trigger, NULL)) {
296                 talloc_free(req);
297                 return NULL;
298         }
299
300         return req;
301 }
302
303 static void comm_write_trigger(struct tevent_req *req, void *private_data)
304 {
305         struct comm_write_state *state = tevent_req_data(
306                 req, struct comm_write_state);
307         struct comm_context *comm = state->comm;
308         struct tevent_req *subreq;
309
310         comm->write_req = req;
311
312         subreq = pkt_write_send(state, state->ev, comm->fd,
313                                 state->buf, state->buflen);
314         if (tevent_req_nomem(subreq, req)) {
315                 return;
316         }
317
318         state->subreq = subreq;
319         tevent_req_set_callback(subreq, comm_write_done, req);
320         TEVENT_FD_WRITEABLE(comm->fde);
321 }
322
323 static void comm_write_done(struct tevent_req *subreq)
324 {
325         struct tevent_req *req = tevent_req_callback_data(
326                 subreq, struct tevent_req);
327         struct comm_write_state *state = tevent_req_data(
328                 req, struct comm_write_state);
329         struct comm_context *comm = state->comm;
330         ssize_t nwritten;
331         int err = 0;
332
333         TEVENT_FD_NOT_WRITEABLE(comm->fde);
334         nwritten = pkt_write_recv(subreq, &err);
335         TALLOC_FREE(subreq);
336         state->subreq = NULL;
337         comm->write_req = NULL;
338         if (nwritten == -1) {
339                 if (err == EPIPE) {
340                         comm->dead_handler(comm->dead_private_data);
341                 }
342                 tevent_req_error(req, err);
343                 return;
344         }
345
346         state->nwritten = nwritten;
347         tevent_req_done(req);
348 }
349
350 bool comm_write_recv(struct tevent_req *req, int *perr)
351 {
352         struct comm_write_state *state = tevent_req_data(
353                 req, struct comm_write_state);
354         int err;
355
356         if (tevent_req_is_unix_error(req, &err)) {
357                 if (perr != NULL) {
358                         *perr = err;
359                 }
360                 return false;
361         }
362
363         if (state->nwritten != state->buflen) {
364                 *perr = EIO;
365                 return false;
366         }
367
368         *perr = 0;
369         return true;
370 }
371
372 static void comm_fd_handler(struct tevent_context *ev,
373                             struct tevent_fd *fde,
374                             uint16_t flags, void *private_data)
375 {
376         struct comm_context *comm = talloc_get_type_abort(
377                 private_data, struct comm_context);
378
379         if (flags & TEVENT_FD_READ) {
380                 struct comm_read_state *read_state;
381
382                 if (comm->read_req == NULL) {
383                         /* This should never happen */
384                         abort();
385                 }
386
387                 read_state = tevent_req_data(comm->read_req,
388                                              struct comm_read_state);
389                 pkt_read_handler(ev, fde, flags, read_state->subreq);
390         }
391
392         if (flags & TEVENT_FD_WRITE) {
393                 struct comm_write_state *write_state;
394
395                 if (comm->write_req == NULL) {
396                         /* This should never happen */
397                         abort();
398                 }
399
400                 write_state = tevent_req_data(comm->write_req,
401                                               struct comm_write_state);
402                 pkt_write_handler(ev, fde, flags, write_state->subreq);
403         }
404 }