ctdb-common: Check the version field in IPv6 packets
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/sys_rw.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_buffer {
43         uint8_t *data;
44         uint32_t length;
45         uint32_t size;
46 };
47
48 struct ctdb_queue_pkt {
49         struct ctdb_queue_pkt *next, *prev;
50         uint8_t *data;
51         uint32_t length;
52         uint32_t full_length;
53         uint8_t buf[];
54 };
55
56 struct ctdb_queue {
57         struct ctdb_context *ctdb;
58         struct tevent_immediate *im;
59         struct ctdb_buffer buffer; /* input buffer */
60         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
61         uint32_t out_queue_length;
62         struct tevent_fd *fde;
63         int fd;
64         size_t alignment;
65         void *private_data;
66         ctdb_queue_cb_fn_t callback;
67         const char *name;
68         uint32_t buffer_size;
69 };
70
71
72
73 int ctdb_queue_length(struct ctdb_queue *queue)
74 {
75         return queue->out_queue_length;
76 }
77
78 static void queue_process(struct ctdb_queue *queue);
79
80 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
81                                 void *private_data)
82 {
83         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
84
85         queue_process(queue);
86 }
87
88 /*
89  * This function is used to process data in queue buffer.
90  *
91  * Queue callback function can end up freeing the queue, there should not be a
92  * loop processing packets from queue buffer.  Instead set up a timed event for
93  * immediate run to process remaining packets from buffer.
94  */
95 static void queue_process(struct ctdb_queue *queue)
96 {
97         uint32_t pkt_size;
98         uint8_t *data;
99
100         if (queue->buffer.length < sizeof(pkt_size)) {
101                 return;
102         }
103
104         /* Did we at least read the size into the buffer */
105         pkt_size = *(uint32_t *)queue->buffer.data;
106         if (pkt_size == 0) {
107                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
108                 goto failed;
109         }
110
111         /* the buffer doesn't contain the full packet, return to get the rest */
112         if (queue->buffer.length < pkt_size) {
113                 return;
114         }
115
116         /* Extract complete packet */
117         data = talloc_memdup(queue, queue->buffer.data, pkt_size);
118         if (data == NULL) {
119                 D_ERR("read error alloc failed for %u\n", pkt_size);
120                 return;
121         }
122
123         /* Shift packet out from buffer */
124         if (queue->buffer.length > pkt_size) {
125                 memmove(queue->buffer.data,
126                         queue->buffer.data + pkt_size,
127                         queue->buffer.length - pkt_size);
128         }
129         queue->buffer.length -= pkt_size;
130
131         if (queue->buffer.length > 0) {
132                 /* There is more data to be processed, schedule an event */
133                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
134                                           queue_process_event, queue);
135         } else {
136                 if (queue->buffer.size > queue->buffer_size) {
137                         TALLOC_FREE(queue->buffer.data);
138                         queue->buffer.size = 0;
139                 }
140         }
141
142         /* It is the responsibility of the callback to free 'data' */
143         queue->callback(data, pkt_size, queue->private_data);
144         return;
145
146 failed:
147         queue->callback(NULL, 0, queue->private_data);
148
149 }
150
151
152 /*
153   called when an incoming connection is readable
154   This function MUST be safe for reentry via the queue callback!
155 */
156 static void queue_io_read(struct ctdb_queue *queue)
157 {
158         int num_ready = 0;
159         uint32_t pkt_size;
160         ssize_t nread;
161         uint8_t *data;
162
163         /* check how much data is available on the socket for immediately
164            guaranteed nonblocking access.
165            as long as we are careful never to try to read more than this
166            we know all reads will be successful and will neither block
167            nor fail with a "data not available right now" error
168         */
169         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
170                 return;
171         }
172         if (num_ready == 0) {
173                 /* the descriptor has been closed */
174                 goto failed;
175         }
176
177         if (queue->buffer.data == NULL) {
178                 /* starting fresh, allocate buf to read data */
179                 queue->buffer.data = talloc_size(queue, queue->buffer_size);
180                 if (queue->buffer.data == NULL) {
181                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
182                         goto failed;
183                 }
184                 queue->buffer.size = queue->buffer_size;
185                 goto data_read;
186         }
187
188         if (queue->buffer.length < sizeof(pkt_size)) {
189                 /* data read is not sufficient to gather message size */
190                 goto data_read;
191         }
192
193         pkt_size = *(uint32_t *)queue->buffer.data;
194         if (pkt_size > queue->buffer.size) {
195                 data = talloc_realloc_size(queue,
196                                            queue->buffer.data,
197                                            pkt_size);
198                 if (data == NULL) {
199                         DBG_ERR("read error realloc failed for %u\n", pkt_size);
200                         goto failed;
201                 }
202                 queue->buffer.data = data;
203                 queue->buffer.size = pkt_size;
204         }
205
206 data_read:
207         num_ready = MIN(num_ready, queue->buffer.size - queue->buffer.length);
208
209         if (num_ready > 0) {
210                 nread = sys_read(queue->fd,
211                                  queue->buffer.data + queue->buffer.length,
212                                  num_ready);
213                 if (nread <= 0) {
214                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
215                         goto failed;
216                 }
217                 queue->buffer.length += nread;
218         }
219
220         queue_process(queue);
221         return;
222
223 failed:
224         queue->callback(NULL, 0, queue->private_data);
225 }
226
227
228 /* used when an event triggers a dead queue */
229 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
230                        void *private_data)
231 {
232         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
233         queue->callback(NULL, 0, queue->private_data);
234 }
235
236
237 /*
238   called when an incoming connection is writeable
239 */
240 static void queue_io_write(struct ctdb_queue *queue)
241 {
242         while (queue->out_queue) {
243                 struct ctdb_queue_pkt *pkt = queue->out_queue;
244                 ssize_t n;
245                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
246                         n = write(queue->fd, pkt->data, 1);
247                 } else {
248                         n = write(queue->fd, pkt->data, pkt->length);
249                 }
250
251                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
252                         if (pkt->length != pkt->full_length) {
253                                 /* partial packet sent - we have to drop it */
254                                 DLIST_REMOVE(queue->out_queue, pkt);
255                                 queue->out_queue_length--;
256                                 talloc_free(pkt);
257                         }
258                         talloc_free(queue->fde);
259                         queue->fde = NULL;
260                         queue->fd = -1;
261                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
262                                                   queue_dead, queue);
263                         return;
264                 }
265                 if (n <= 0) return;
266                 
267                 if (n != pkt->length) {
268                         pkt->length -= n;
269                         pkt->data += n;
270                         return;
271                 }
272
273                 DLIST_REMOVE(queue->out_queue, pkt);
274                 queue->out_queue_length--;
275                 talloc_free(pkt);
276         }
277
278         TEVENT_FD_NOT_WRITEABLE(queue->fde);
279 }
280
281 /*
282   called when an incoming connection is readable or writeable
283 */
284 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
285                              uint16_t flags, void *private_data)
286 {
287         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
288
289         if (flags & TEVENT_FD_READ) {
290                 queue_io_read(queue);
291         } else {
292                 queue_io_write(queue);
293         }
294 }
295
296
297 /*
298   queue a packet for sending
299 */
300 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
301 {
302         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
303         struct ctdb_queue_pkt *pkt;
304         uint32_t length2, full_length;
305
306         /* If the queue does not have valid fd, no point queueing a packet */
307         if (queue->fd == -1) {
308                 return 0;
309         }
310
311         if (queue->alignment) {
312                 /* enforce the length and alignment rules from the tcp packet allocator */
313                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
314                 *(uint32_t *)data = length2;
315         } else {
316                 length2 = length;
317         }
318
319         if (length2 != length) {
320                 memset(data+length, 0, length2-length);
321         }
322
323         full_length = length2;
324         
325         /* if the queue is empty then try an immediate write, avoiding
326            queue overhead. This relies on non-blocking sockets */
327         if (queue->out_queue == NULL && queue->fd != -1 &&
328             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
329                 ssize_t n = write(queue->fd, data, length2);
330                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
331                         talloc_free(queue->fde);
332                         queue->fde = NULL;
333                         queue->fd = -1;
334                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
335                                                   queue_dead, queue);
336                         /* yes, we report success, as the dead node is 
337                            handled via a separate event */
338                         return 0;
339                 }
340                 if (n > 0) {
341                         data += n;
342                         length2 -= n;
343                 }
344                 if (length2 == 0) return 0;
345         }
346
347         pkt = talloc_size(
348                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
349         CTDB_NO_MEMORY(queue->ctdb, pkt);
350         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
351
352         pkt->data = pkt->buf;
353         memcpy(pkt->data, data, length2);
354
355         pkt->length = length2;
356         pkt->full_length = full_length;
357
358         if (queue->out_queue == NULL && queue->fd != -1) {
359                 TEVENT_FD_WRITEABLE(queue->fde);
360         }
361
362         DLIST_ADD_END(queue->out_queue, pkt);
363
364         queue->out_queue_length++;
365
366         if (queue->ctdb->tunable.verbose_memory_names != 0) {
367                 switch (hdr->operation) {
368                 case CTDB_REQ_CONTROL: {
369                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
370                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
371                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
372                         break;
373                 }
374                 case CTDB_REQ_MESSAGE: {
375                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
376                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
377                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
378                         break;
379                 }
380                 default:
381                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
382                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
383                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
384                         break;
385                 }
386         }
387
388         return 0;
389 }
390
391
392 /*
393   setup the fd used by the queue
394  */
395 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
396 {
397         queue->fd = fd;
398         talloc_free(queue->fde);
399         queue->fde = NULL;
400
401         if (fd != -1) {
402                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
403                                            TEVENT_FD_READ,
404                                            queue_io_handler, queue);
405                 if (queue->fde == NULL) {
406                         return -1;
407                 }
408                 tevent_fd_set_auto_close(queue->fde);
409
410                 if (queue->out_queue) {
411                         TEVENT_FD_WRITEABLE(queue->fde);
412                 }
413         }
414
415         return 0;
416 }
417
418 /*
419   setup a packet queue on a socket
420  */
421 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
422                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
423                                     ctdb_queue_cb_fn_t callback,
424                                     void *private_data, const char *fmt, ...)
425 {
426         struct ctdb_queue *queue;
427         va_list ap;
428
429         queue = talloc_zero(mem_ctx, struct ctdb_queue);
430         CTDB_NO_MEMORY_NULL(ctdb, queue);
431         va_start(ap, fmt);
432         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
433         va_end(ap);
434         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
435
436         queue->im= tevent_create_immediate(queue);
437         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
438
439         queue->ctdb = ctdb;
440         queue->fd = fd;
441         queue->alignment = alignment;
442         queue->private_data = private_data;
443         queue->callback = callback;
444         if (fd != -1) {
445                 if (ctdb_queue_set_fd(queue, fd) != 0) {
446                         talloc_free(queue);
447                         return NULL;
448                 }
449         }
450
451         queue->buffer_size = ctdb->tunable.queue_buffer_size;
452         /* In client code, ctdb->tunable is not initialized.
453          * This does not affect recovery daemon.
454          */
455         if (queue->buffer_size == 0) {
456                 queue->buffer_size = 1024;
457         }
458
459         return queue;
460 }