da444aca2b69c91db279836e576b54711fa44353
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/sys_rw.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_buffer {
43         uint8_t *data;
44         uint32_t length;
45         uint32_t size;
46         uint32_t extend;
47 };
48
49 struct ctdb_queue_pkt {
50         struct ctdb_queue_pkt *next, *prev;
51         uint8_t *data;
52         uint32_t length;
53         uint32_t full_length;
54         uint8_t buf[];
55 };
56
57 struct ctdb_queue {
58         struct ctdb_context *ctdb;
59         struct tevent_immediate *im;
60         struct ctdb_buffer buffer; /* input buffer */
61         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
62         uint32_t out_queue_length;
63         struct tevent_fd *fde;
64         int fd;
65         size_t alignment;
66         void *private_data;
67         ctdb_queue_cb_fn_t callback;
68         const char *name;
69         uint32_t buffer_size;
70 };
71
72
73
74 int ctdb_queue_length(struct ctdb_queue *queue)
75 {
76         return queue->out_queue_length;
77 }
78
79 static void queue_process(struct ctdb_queue *queue);
80
81 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
82                                 void *private_data)
83 {
84         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
85
86         queue_process(queue);
87 }
88
89 /*
90  * This function is used to process data in queue buffer.
91  *
92  * Queue callback function can end up freeing the queue, there should not be a
93  * loop processing packets from queue buffer.  Instead set up a timed event for
94  * immediate run to process remaining packets from buffer.
95  */
96 static void queue_process(struct ctdb_queue *queue)
97 {
98         uint32_t pkt_size;
99         uint8_t *data;
100
101         if (queue->buffer.length < sizeof(pkt_size)) {
102                 return;
103         }
104
105         pkt_size = *(uint32_t *)queue->buffer.data;
106         if (pkt_size == 0) {
107                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
108                 goto failed;
109         }
110
111         if (queue->buffer.length < pkt_size) {
112                 if (pkt_size > queue->buffer_size) {
113                         queue->buffer.extend = pkt_size;
114                 }
115                 return;
116         }
117
118         /* Extract complete packet */
119         data = talloc_memdup(queue, queue->buffer.data, pkt_size);
120         if (data == NULL) {
121                 D_ERR("read error alloc failed for %u\n", pkt_size);
122                 return;
123         }
124
125         /* Shift packet out from buffer */
126         if (queue->buffer.length > pkt_size) {
127                 memmove(queue->buffer.data,
128                         queue->buffer.data + pkt_size,
129                         queue->buffer.length - pkt_size);
130         }
131         queue->buffer.length -= pkt_size;
132
133         if (queue->buffer.length > 0) {
134                 /* There is more data to be processed, schedule an event */
135                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
136                                           queue_process_event, queue);
137         } else {
138                 if (queue->buffer.size > queue->buffer_size) {
139                         TALLOC_FREE(queue->buffer.data);
140                         queue->buffer.size = 0;
141                 }
142         }
143
144         /* It is the responsibility of the callback to free 'data' */
145         queue->callback(data, pkt_size, queue->private_data);
146         return;
147
148 failed:
149         queue->callback(NULL, 0, queue->private_data);
150
151 }
152
153
154 /*
155   called when an incoming connection is readable
156   This function MUST be safe for reentry via the queue callback!
157 */
158 static void queue_io_read(struct ctdb_queue *queue)
159 {
160         int num_ready = 0;
161         ssize_t nread;
162         uint8_t *data;
163         int navail;
164
165         /* check how much data is available on the socket for immediately
166            guaranteed nonblocking access.
167            as long as we are careful never to try to read more than this
168            we know all reads will be successful and will neither block
169            nor fail with a "data not available right now" error
170         */
171         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
172                 return;
173         }
174         if (num_ready == 0) {
175                 /* the descriptor has been closed */
176                 goto failed;
177         }
178
179         if (queue->buffer.data == NULL) {
180                 /* starting fresh, allocate buf to read data */
181                 queue->buffer.data = talloc_size(queue, queue->buffer_size);
182                 if (queue->buffer.data == NULL) {
183                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
184                         goto failed;
185                 }
186                 queue->buffer.size = queue->buffer_size;
187         } else if (queue->buffer.extend > 0) {
188                 /* extending buffer */
189                 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
190                 if (data == NULL) {
191                         DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
192                         goto failed;
193                 }
194                 queue->buffer.data = data;
195                 queue->buffer.size = queue->buffer.extend;
196                 queue->buffer.extend = 0;
197         }
198
199         navail = queue->buffer.size - queue->buffer.length;
200         if (num_ready > navail) {
201                 num_ready = navail;
202         }
203
204         if (num_ready > 0) {
205                 nread = sys_read(queue->fd,
206                                  queue->buffer.data + queue->buffer.length,
207                                  num_ready);
208                 if (nread <= 0) {
209                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
210                         goto failed;
211                 }
212                 queue->buffer.length += nread;
213         }
214
215         queue_process(queue);
216         return;
217
218 failed:
219         queue->callback(NULL, 0, queue->private_data);
220 }
221
222
223 /* used when an event triggers a dead queue */
224 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
225                        void *private_data)
226 {
227         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
228         queue->callback(NULL, 0, queue->private_data);
229 }
230
231
232 /*
233   called when an incoming connection is writeable
234 */
235 static void queue_io_write(struct ctdb_queue *queue)
236 {
237         while (queue->out_queue) {
238                 struct ctdb_queue_pkt *pkt = queue->out_queue;
239                 ssize_t n;
240                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
241                         n = write(queue->fd, pkt->data, 1);
242                 } else {
243                         n = write(queue->fd, pkt->data, pkt->length);
244                 }
245
246                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
247                         if (pkt->length != pkt->full_length) {
248                                 /* partial packet sent - we have to drop it */
249                                 DLIST_REMOVE(queue->out_queue, pkt);
250                                 queue->out_queue_length--;
251                                 talloc_free(pkt);
252                         }
253                         talloc_free(queue->fde);
254                         queue->fde = NULL;
255                         queue->fd = -1;
256                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
257                                                   queue_dead, queue);
258                         return;
259                 }
260                 if (n <= 0) return;
261                 
262                 if (n != pkt->length) {
263                         pkt->length -= n;
264                         pkt->data += n;
265                         return;
266                 }
267
268                 DLIST_REMOVE(queue->out_queue, pkt);
269                 queue->out_queue_length--;
270                 talloc_free(pkt);
271         }
272
273         TEVENT_FD_NOT_WRITEABLE(queue->fde);
274 }
275
276 /*
277   called when an incoming connection is readable or writeable
278 */
279 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
280                              uint16_t flags, void *private_data)
281 {
282         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
283
284         if (flags & TEVENT_FD_READ) {
285                 queue_io_read(queue);
286         } else {
287                 queue_io_write(queue);
288         }
289 }
290
291
292 /*
293   queue a packet for sending
294 */
295 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
296 {
297         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
298         struct ctdb_queue_pkt *pkt;
299         uint32_t length2, full_length;
300
301         /* If the queue does not have valid fd, no point queueing a packet */
302         if (queue->fd == -1) {
303                 return 0;
304         }
305
306         if (queue->alignment) {
307                 /* enforce the length and alignment rules from the tcp packet allocator */
308                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
309                 *(uint32_t *)data = length2;
310         } else {
311                 length2 = length;
312         }
313
314         if (length2 != length) {
315                 memset(data+length, 0, length2-length);
316         }
317
318         full_length = length2;
319         
320         /* if the queue is empty then try an immediate write, avoiding
321            queue overhead. This relies on non-blocking sockets */
322         if (queue->out_queue == NULL && queue->fd != -1 &&
323             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
324                 ssize_t n = write(queue->fd, data, length2);
325                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
326                         talloc_free(queue->fde);
327                         queue->fde = NULL;
328                         queue->fd = -1;
329                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
330                                                   queue_dead, queue);
331                         /* yes, we report success, as the dead node is 
332                            handled via a separate event */
333                         return 0;
334                 }
335                 if (n > 0) {
336                         data += n;
337                         length2 -= n;
338                 }
339                 if (length2 == 0) return 0;
340         }
341
342         pkt = talloc_size(
343                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
344         CTDB_NO_MEMORY(queue->ctdb, pkt);
345         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
346
347         pkt->data = pkt->buf;
348         memcpy(pkt->data, data, length2);
349
350         pkt->length = length2;
351         pkt->full_length = full_length;
352
353         if (queue->out_queue == NULL && queue->fd != -1) {
354                 TEVENT_FD_WRITEABLE(queue->fde);
355         }
356
357         DLIST_ADD_END(queue->out_queue, pkt);
358
359         queue->out_queue_length++;
360
361         if (queue->ctdb->tunable.verbose_memory_names != 0) {
362                 switch (hdr->operation) {
363                 case CTDB_REQ_CONTROL: {
364                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
365                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
366                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
367                         break;
368                 }
369                 case CTDB_REQ_MESSAGE: {
370                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
371                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
372                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
373                         break;
374                 }
375                 default:
376                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
377                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
378                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
379                         break;
380                 }
381         }
382
383         return 0;
384 }
385
386
387 /*
388   setup the fd used by the queue
389  */
390 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
391 {
392         queue->fd = fd;
393         talloc_free(queue->fde);
394         queue->fde = NULL;
395
396         if (fd != -1) {
397                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
398                                            TEVENT_FD_READ,
399                                            queue_io_handler, queue);
400                 if (queue->fde == NULL) {
401                         return -1;
402                 }
403                 tevent_fd_set_auto_close(queue->fde);
404
405                 if (queue->out_queue) {
406                         TEVENT_FD_WRITEABLE(queue->fde);
407                 }
408         }
409
410         return 0;
411 }
412
413 /*
414   setup a packet queue on a socket
415  */
416 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
417                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
418                                     ctdb_queue_cb_fn_t callback,
419                                     void *private_data, const char *fmt, ...)
420 {
421         struct ctdb_queue *queue;
422         va_list ap;
423
424         queue = talloc_zero(mem_ctx, struct ctdb_queue);
425         CTDB_NO_MEMORY_NULL(ctdb, queue);
426         va_start(ap, fmt);
427         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
428         va_end(ap);
429         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
430
431         queue->im= tevent_create_immediate(queue);
432         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
433
434         queue->ctdb = ctdb;
435         queue->fd = fd;
436         queue->alignment = alignment;
437         queue->private_data = private_data;
438         queue->callback = callback;
439         if (fd != -1) {
440                 if (ctdb_queue_set_fd(queue, fd) != 0) {
441                         talloc_free(queue);
442                         return NULL;
443                 }
444         }
445
446         queue->buffer_size = ctdb->tunable.queue_buffer_size;
447         /* In client code, ctdb->tunable is not initialized.
448          * This does not affect recovery daemon.
449          */
450         if (queue->buffer_size == 0) {
451                 queue->buffer_size = 1024;
452         }
453
454         return queue;
455 }