ctdb-tests: Strengthen some tests
[samba.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/sys_rw.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_buffer {
43         uint8_t *data;
44         uint32_t length;
45         uint32_t size;
46         uint32_t extend;
47 };
48
49 struct ctdb_queue_pkt {
50         struct ctdb_queue_pkt *next, *prev;
51         uint8_t *data;
52         uint32_t length;
53         uint32_t full_length;
54         uint8_t buf[];
55 };
56
57 struct ctdb_queue {
58         struct ctdb_context *ctdb;
59         struct tevent_immediate *im;
60         struct ctdb_buffer buffer; /* input buffer */
61         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
62         uint32_t out_queue_length;
63         struct tevent_fd *fde;
64         int fd;
65         size_t alignment;
66         void *private_data;
67         ctdb_queue_cb_fn_t callback;
68         bool *destroyed;
69         const char *name;
70         uint32_t buffer_size;
71 };
72
73
74
75 int ctdb_queue_length(struct ctdb_queue *queue)
76 {
77         return queue->out_queue_length;
78 }
79
80 static void queue_process(struct ctdb_queue *queue);
81
82 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
83                                 void *private_data)
84 {
85         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
86
87         queue_process(queue);
88 }
89
90 /*
91  * This function is used to process data in queue buffer.
92  *
93  * Queue callback function can end up freeing the queue, there should not be a
94  * loop processing packets from queue buffer.  Instead set up a timed event for
95  * immediate run to process remaining packets from buffer.
96  */
97 static void queue_process(struct ctdb_queue *queue)
98 {
99         uint32_t pkt_size;
100         uint8_t *data;
101
102         if (queue->buffer.length < sizeof(pkt_size)) {
103                 return;
104         }
105
106         pkt_size = *(uint32_t *)queue->buffer.data;
107         if (pkt_size == 0) {
108                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
109                 goto failed;
110         }
111
112         if (queue->buffer.length < pkt_size) {
113                 if (pkt_size > queue->buffer_size) {
114                         queue->buffer.extend = pkt_size;
115                 }
116                 return;
117         }
118
119         /* Extract complete packet */
120         data = talloc_size(queue, pkt_size);
121         if (data == NULL) {
122                 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
123                 return;
124         }
125         memcpy(data, queue->buffer.data, pkt_size);
126
127         /* Shift packet out from buffer */
128         if (queue->buffer.length > pkt_size) {
129                 memmove(queue->buffer.data,
130                         queue->buffer.data + pkt_size,
131                         queue->buffer.length - pkt_size);
132         }
133         queue->buffer.length -= pkt_size;
134
135         if (queue->buffer.length > 0) {
136                 /* There is more data to be processed, schedule an event */
137                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
138                                           queue_process_event, queue);
139         } else {
140                 if (queue->buffer.size > queue->buffer_size) {
141                         TALLOC_FREE(queue->buffer.data);
142                         queue->buffer.size = 0;
143                 }
144         }
145
146         /* It is the responsibility of the callback to free 'data' */
147         queue->callback(data, pkt_size, queue->private_data);
148         return;
149
150 failed:
151         queue->callback(NULL, 0, queue->private_data);
152
153 }
154
155
156 /*
157   called when an incoming connection is readable
158   This function MUST be safe for reentry via the queue callback!
159 */
160 static void queue_io_read(struct ctdb_queue *queue)
161 {
162         int num_ready = 0;
163         ssize_t nread;
164         uint8_t *data;
165         int navail;
166
167         /* check how much data is available on the socket for immediately
168            guaranteed nonblocking access.
169            as long as we are careful never to try to read more than this
170            we know all reads will be successful and will neither block
171            nor fail with a "data not available right now" error
172         */
173         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
174                 return;
175         }
176         if (num_ready == 0) {
177                 /* the descriptor has been closed */
178                 goto failed;
179         }
180
181         if (queue->buffer.data == NULL) {
182                 /* starting fresh, allocate buf to read data */
183                 queue->buffer.data = talloc_size(queue, queue->buffer_size);
184                 if (queue->buffer.data == NULL) {
185                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
186                         goto failed;
187                 }
188                 queue->buffer.size = queue->buffer_size;
189         } else if (queue->buffer.extend > 0) {
190                 /* extending buffer */
191                 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
192                 if (data == NULL) {
193                         DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
194                         goto failed;
195                 }
196                 queue->buffer.data = data;
197                 queue->buffer.size = queue->buffer.extend;
198                 queue->buffer.extend = 0;
199         }
200
201         navail = queue->buffer.size - queue->buffer.length;
202         if (num_ready > navail) {
203                 num_ready = navail;
204         }
205
206         if (num_ready > 0) {
207                 nread = sys_read(queue->fd,
208                                  queue->buffer.data + queue->buffer.length,
209                                  num_ready);
210                 if (nread <= 0) {
211                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
212                         goto failed;
213                 }
214                 queue->buffer.length += nread;
215         }
216
217         queue_process(queue);
218         return;
219
220 failed:
221         queue->callback(NULL, 0, queue->private_data);
222 }
223
224
225 /* used when an event triggers a dead queue */
226 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
227                        void *private_data)
228 {
229         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
230         queue->callback(NULL, 0, queue->private_data);
231 }
232
233
234 /*
235   called when an incoming connection is writeable
236 */
237 static void queue_io_write(struct ctdb_queue *queue)
238 {
239         while (queue->out_queue) {
240                 struct ctdb_queue_pkt *pkt = queue->out_queue;
241                 ssize_t n;
242                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
243                         n = write(queue->fd, pkt->data, 1);
244                 } else {
245                         n = write(queue->fd, pkt->data, pkt->length);
246                 }
247
248                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
249                         if (pkt->length != pkt->full_length) {
250                                 /* partial packet sent - we have to drop it */
251                                 DLIST_REMOVE(queue->out_queue, pkt);
252                                 queue->out_queue_length--;
253                                 talloc_free(pkt);
254                         }
255                         talloc_free(queue->fde);
256                         queue->fde = NULL;
257                         queue->fd = -1;
258                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
259                                                   queue_dead, queue);
260                         return;
261                 }
262                 if (n <= 0) return;
263                 
264                 if (n != pkt->length) {
265                         pkt->length -= n;
266                         pkt->data += n;
267                         return;
268                 }
269
270                 DLIST_REMOVE(queue->out_queue, pkt);
271                 queue->out_queue_length--;
272                 talloc_free(pkt);
273         }
274
275         TEVENT_FD_NOT_WRITEABLE(queue->fde);
276 }
277
278 /*
279   called when an incoming connection is readable or writeable
280 */
281 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
282                              uint16_t flags, void *private_data)
283 {
284         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
285
286         if (flags & TEVENT_FD_READ) {
287                 queue_io_read(queue);
288         } else {
289                 queue_io_write(queue);
290         }
291 }
292
293
294 /*
295   queue a packet for sending
296 */
297 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
298 {
299         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
300         struct ctdb_queue_pkt *pkt;
301         uint32_t length2, full_length;
302
303         /* If the queue does not have valid fd, no point queueing a packet */
304         if (queue->fd == -1) {
305                 return 0;
306         }
307
308         if (queue->alignment) {
309                 /* enforce the length and alignment rules from the tcp packet allocator */
310                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
311                 *(uint32_t *)data = length2;
312         } else {
313                 length2 = length;
314         }
315
316         if (length2 != length) {
317                 memset(data+length, 0, length2-length);
318         }
319
320         full_length = length2;
321         
322         /* if the queue is empty then try an immediate write, avoiding
323            queue overhead. This relies on non-blocking sockets */
324         if (queue->out_queue == NULL && queue->fd != -1 &&
325             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
326                 ssize_t n = write(queue->fd, data, length2);
327                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
328                         talloc_free(queue->fde);
329                         queue->fde = NULL;
330                         queue->fd = -1;
331                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
332                                                   queue_dead, queue);
333                         /* yes, we report success, as the dead node is 
334                            handled via a separate event */
335                         return 0;
336                 }
337                 if (n > 0) {
338                         data += n;
339                         length2 -= n;
340                 }
341                 if (length2 == 0) return 0;
342         }
343
344         pkt = talloc_size(
345                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
346         CTDB_NO_MEMORY(queue->ctdb, pkt);
347         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
348
349         pkt->data = pkt->buf;
350         memcpy(pkt->data, data, length2);
351
352         pkt->length = length2;
353         pkt->full_length = full_length;
354
355         if (queue->out_queue == NULL && queue->fd != -1) {
356                 TEVENT_FD_WRITEABLE(queue->fde);
357         }
358
359         DLIST_ADD_END(queue->out_queue, pkt);
360
361         queue->out_queue_length++;
362
363         if (queue->ctdb->tunable.verbose_memory_names != 0) {
364                 switch (hdr->operation) {
365                 case CTDB_REQ_CONTROL: {
366                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
367                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
368                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
369                         break;
370                 }
371                 case CTDB_REQ_MESSAGE: {
372                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
373                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
374                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
375                         break;
376                 }
377                 default:
378                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
379                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
380                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
381                         break;
382                 }
383         }
384
385         return 0;
386 }
387
388
389 /*
390   setup the fd used by the queue
391  */
392 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
393 {
394         queue->fd = fd;
395         talloc_free(queue->fde);
396         queue->fde = NULL;
397
398         if (fd != -1) {
399                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
400                                            TEVENT_FD_READ,
401                                            queue_io_handler, queue);
402                 if (queue->fde == NULL) {
403                         return -1;
404                 }
405                 tevent_fd_set_auto_close(queue->fde);
406
407                 if (queue->out_queue) {
408                         TEVENT_FD_WRITEABLE(queue->fde);
409                 }
410         }
411
412         return 0;
413 }
414
415 /* If someone sets up this pointer, they want to know if the queue is freed */
416 static int queue_destructor(struct ctdb_queue *queue)
417 {
418         TALLOC_FREE(queue->buffer.data);
419         queue->buffer.length = 0;
420         queue->buffer.size = 0;
421         if (queue->destroyed != NULL)
422                 *queue->destroyed = true;
423         return 0;
424 }
425
426 /*
427   setup a packet queue on a socket
428  */
429 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
430                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
431                                     ctdb_queue_cb_fn_t callback,
432                                     void *private_data, const char *fmt, ...)
433 {
434         struct ctdb_queue *queue;
435         va_list ap;
436
437         queue = talloc_zero(mem_ctx, struct ctdb_queue);
438         CTDB_NO_MEMORY_NULL(ctdb, queue);
439         va_start(ap, fmt);
440         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
441         va_end(ap);
442         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
443
444         queue->im= tevent_create_immediate(queue);
445         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
446
447         queue->ctdb = ctdb;
448         queue->fd = fd;
449         queue->alignment = alignment;
450         queue->private_data = private_data;
451         queue->callback = callback;
452         if (fd != -1) {
453                 if (ctdb_queue_set_fd(queue, fd) != 0) {
454                         talloc_free(queue);
455                         return NULL;
456                 }
457         }
458         talloc_set_destructor(queue, queue_destructor);
459
460         queue->buffer_size = ctdb->tunable.queue_buffer_size;
461         /* In client code, ctdb->tunable is not initialized.
462          * This does not affect recovery daemon.
463          */
464         if (queue->buffer_size == 0) {
465                 queue->buffer_size = 1024;
466         }
467
468         return queue;
469 }