ctdb: Adding memory pool for queue callback
[amitay/samba.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/sys_rw.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_buffer {
43         uint8_t *data;
44         uint32_t length;
45         uint32_t size;
46         uint32_t offset;
47 };
48
49 struct ctdb_queue_pkt {
50         struct ctdb_queue_pkt *next, *prev;
51         uint8_t *data;
52         uint32_t length;
53         uint32_t full_length;
54         uint8_t buf[];
55 };
56
57 struct ctdb_queue {
58         struct ctdb_context *ctdb;
59         struct tevent_immediate *im;
60         struct ctdb_buffer buffer; /* input buffer */
61         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
62         uint32_t out_queue_length;
63         struct tevent_fd *fde;
64         int fd;
65         size_t alignment;
66         void *private_data;
67         ctdb_queue_cb_fn_t callback;
68         TALLOC_CTX *data_pool;
69         const char *name;
70         uint32_t buffer_size;
71 };
72
73
74
75 int ctdb_queue_length(struct ctdb_queue *queue)
76 {
77         return queue->out_queue_length;
78 }
79
80 static void queue_process(struct ctdb_queue *queue);
81
82 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
83                                 void *private_data)
84 {
85         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
86
87         queue_process(queue);
88 }
89
90 /*
91  * This function is used to process data in queue buffer.
92  *
93  * Queue callback function can end up freeing the queue, there should not be a
94  * loop processing packets from queue buffer.  Instead set up a timed event for
95  * immediate run to process remaining packets from buffer.
96  */
97 static void queue_process(struct ctdb_queue *queue)
98 {
99         uint32_t pkt_size;
100         uint8_t *data = NULL;
101
102         if (queue->buffer.length < sizeof(pkt_size)) {
103                 return;
104         }
105
106         /* Did we at least read the size into the buffer */
107         pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
108         if (pkt_size == 0) {
109                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
110                 goto failed;
111         }
112
113         /* the buffer doesn't contain the full packet, return to get the rest */
114         if (queue->buffer.length < pkt_size) {
115                 return;
116         }
117
118         /* Extract complete packet */
119         data = talloc_memdup(queue->data_pool,
120                              queue->buffer.data + queue->buffer.offset,
121                              pkt_size);
122
123         if (data == NULL) {
124                 D_ERR("read error alloc failed for %u\n", pkt_size);
125                 return;
126         }
127
128         queue->buffer.offset += pkt_size;
129         queue->buffer.length -= pkt_size;
130
131         if (queue->buffer.offset < pkt_size ||
132             queue->buffer.offset > queue->buffer.size) {
133                 D_ERR("buffer offset overflow\n");
134                 TALLOC_FREE(queue->buffer.data);
135                 memset(&queue->buffer, 0, sizeof(queue->buffer));
136                 goto failed;
137         }
138
139         if (queue->buffer.length > 0) {
140                 /* There is more data to be processed, schedule an event */
141                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
142                                           queue_process_event, queue);
143         } else {
144                 if (queue->buffer.size > queue->buffer_size) {
145                         TALLOC_FREE(queue->buffer.data);
146                         queue->buffer.size = 0;
147                 }
148                 queue->buffer.offset = 0;
149         }
150
151         /* It is the responsibility of the callback to free 'data' */
152         queue->callback(data, pkt_size, queue->private_data);
153         return;
154
155 failed:
156         queue->callback(NULL, 0, queue->private_data);
157 }
158
159 /*
160   called when an incoming connection is readable
161   This function MUST be safe for reentry via the queue callback!
162 */
163 static void queue_io_read(struct ctdb_queue *queue)
164 {
165         int num_ready = 0;
166         uint32_t pkt_size = 0;
167         ssize_t nread;
168         uint8_t *data;
169
170         /* check how much data is available on the socket for immediately
171            guaranteed nonblocking access.
172            as long as we are careful never to try to read more than this
173            we know all reads will be successful and will neither block
174            nor fail with a "data not available right now" error
175         */
176         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
177                 return;
178         }
179         if (num_ready == 0) {
180                 /* the descriptor has been closed */
181                 goto failed;
182         }
183
184         if (queue->buffer.data == NULL) {
185                 /* starting fresh, allocate buf to read data */
186                 queue->buffer.data = talloc_size(queue, queue->buffer_size);
187                 if (queue->buffer.data == NULL) {
188                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
189                         goto failed;
190                 }
191                 queue->buffer.size = queue->buffer_size;
192                 goto data_read;
193         }
194
195         if (sizeof(pkt_size) > queue->buffer.length) {
196                 /* data read is not sufficient to gather message size */
197                 goto buffer_shift;
198         }
199
200         pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
201         if (pkt_size > queue->buffer.size) {
202                 data = talloc_realloc_size(queue,
203                                            queue->buffer.data,
204                                            pkt_size);
205                 if (data == NULL) {
206                         DBG_ERR("read error realloc failed for %u\n", pkt_size);
207                         goto failed;
208                 }
209                 queue->buffer.data = data;
210                 queue->buffer.size = pkt_size;
211                 /* fall through here as we might need to move the data as well */
212         }
213
214 buffer_shift:
215         if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset ||
216             pkt_size > queue->buffer.size - queue->buffer.offset) {
217                 /* Either the offset has progressed too far to host at least
218                  * the size information or the remaining space in the buffer
219                  * is not sufficient for the full message.
220                  * Therefore, move the data and try again.
221                  */
222                 memmove(queue->buffer.data,
223                         queue->buffer.data + queue->buffer.offset,
224                         queue->buffer.length);
225                 queue->buffer.offset = 0;
226         }
227
228 data_read:
229         num_ready = MIN(num_ready, queue->buffer.size - queue->buffer.length);
230
231         if (num_ready > 0) {
232                 nread = sys_read(queue->fd,
233                                  queue->buffer.data +
234                                         queue->buffer.offset +
235                                         queue->buffer.length,
236                                  num_ready);
237                 if (nread <= 0) {
238                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
239                         goto failed;
240                 }
241                 queue->buffer.length += nread;
242         }
243
244         queue_process(queue);
245         return;
246
247 failed:
248         queue->callback(NULL, 0, queue->private_data);
249 }
250
251
252 /* used when an event triggers a dead queue */
253 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
254                        void *private_data)
255 {
256         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
257         queue->callback(NULL, 0, queue->private_data);
258 }
259
260
261 /*
262   called when an incoming connection is writeable
263 */
264 static void queue_io_write(struct ctdb_queue *queue)
265 {
266         while (queue->out_queue) {
267                 struct ctdb_queue_pkt *pkt = queue->out_queue;
268                 ssize_t n;
269                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
270                         n = write(queue->fd, pkt->data, 1);
271                 } else {
272                         n = write(queue->fd, pkt->data, pkt->length);
273                 }
274
275                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
276                         if (pkt->length != pkt->full_length) {
277                                 /* partial packet sent - we have to drop it */
278                                 DLIST_REMOVE(queue->out_queue, pkt);
279                                 queue->out_queue_length--;
280                                 talloc_free(pkt);
281                         }
282                         talloc_free(queue->fde);
283                         queue->fde = NULL;
284                         queue->fd = -1;
285                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
286                                                   queue_dead, queue);
287                         return;
288                 }
289                 if (n <= 0) return;
290                 
291                 if (n != pkt->length) {
292                         pkt->length -= n;
293                         pkt->data += n;
294                         return;
295                 }
296
297                 DLIST_REMOVE(queue->out_queue, pkt);
298                 queue->out_queue_length--;
299                 talloc_free(pkt);
300         }
301
302         TEVENT_FD_NOT_WRITEABLE(queue->fde);
303 }
304
305 /*
306   called when an incoming connection is readable or writeable
307 */
308 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
309                              uint16_t flags, void *private_data)
310 {
311         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
312
313         if (flags & TEVENT_FD_READ) {
314                 queue_io_read(queue);
315         } else {
316                 queue_io_write(queue);
317         }
318 }
319
320
321 /*
322   queue a packet for sending
323 */
324 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
325 {
326         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
327         struct ctdb_queue_pkt *pkt;
328         uint32_t length2, full_length;
329
330         /* If the queue does not have valid fd, no point queueing a packet */
331         if (queue->fd == -1) {
332                 return 0;
333         }
334
335         if (queue->alignment) {
336                 /* enforce the length and alignment rules from the tcp packet allocator */
337                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
338                 *(uint32_t *)data = length2;
339         } else {
340                 length2 = length;
341         }
342
343         if (length2 != length) {
344                 memset(data+length, 0, length2-length);
345         }
346
347         full_length = length2;
348         
349         /* if the queue is empty then try an immediate write, avoiding
350            queue overhead. This relies on non-blocking sockets */
351         if (queue->out_queue == NULL && queue->fd != -1 &&
352             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
353                 ssize_t n = write(queue->fd, data, length2);
354                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
355                         talloc_free(queue->fde);
356                         queue->fde = NULL;
357                         queue->fd = -1;
358                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
359                                                   queue_dead, queue);
360                         /* yes, we report success, as the dead node is 
361                            handled via a separate event */
362                         return 0;
363                 }
364                 if (n > 0) {
365                         data += n;
366                         length2 -= n;
367                 }
368                 if (length2 == 0) return 0;
369         }
370
371         pkt = talloc_size(
372                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
373         CTDB_NO_MEMORY(queue->ctdb, pkt);
374         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
375
376         pkt->data = pkt->buf;
377         memcpy(pkt->data, data, length2);
378
379         pkt->length = length2;
380         pkt->full_length = full_length;
381
382         if (queue->out_queue == NULL && queue->fd != -1) {
383                 TEVENT_FD_WRITEABLE(queue->fde);
384         }
385
386         DLIST_ADD_END(queue->out_queue, pkt);
387
388         queue->out_queue_length++;
389
390         if (queue->ctdb->tunable.verbose_memory_names != 0) {
391                 switch (hdr->operation) {
392                 case CTDB_REQ_CONTROL: {
393                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
394                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
395                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
396                         break;
397                 }
398                 case CTDB_REQ_MESSAGE: {
399                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
400                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
401                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
402                         break;
403                 }
404                 default:
405                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
406                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
407                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
408                         break;
409                 }
410         }
411
412         return 0;
413 }
414
415
416 /*
417   setup the fd used by the queue
418  */
419 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
420 {
421         queue->fd = fd;
422         talloc_free(queue->fde);
423         queue->fde = NULL;
424
425         if (fd != -1) {
426                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
427                                            TEVENT_FD_READ,
428                                            queue_io_handler, queue);
429                 if (queue->fde == NULL) {
430                         return -1;
431                 }
432                 tevent_fd_set_auto_close(queue->fde);
433
434                 if (queue->out_queue) {
435                         TEVENT_FD_WRITEABLE(queue->fde);
436                 }
437         }
438
439         return 0;
440 }
441
442 /*
443   setup a packet queue on a socket
444  */
445 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
446                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
447                                     ctdb_queue_cb_fn_t callback,
448                                     void *private_data, const char *fmt, ...)
449 {
450         struct ctdb_queue *queue;
451         va_list ap;
452
453         queue = talloc_zero(mem_ctx, struct ctdb_queue);
454         CTDB_NO_MEMORY_NULL(ctdb, queue);
455         va_start(ap, fmt);
456         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
457         va_end(ap);
458         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
459
460         queue->im= tevent_create_immediate(queue);
461         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
462
463         queue->ctdb = ctdb;
464         queue->fd = fd;
465         queue->alignment = alignment;
466         queue->private_data = private_data;
467         queue->callback = callback;
468         if (fd != -1) {
469                 if (ctdb_queue_set_fd(queue, fd) != 0) {
470                         talloc_free(queue);
471                         return NULL;
472                 }
473         }
474
475         queue->buffer_size = ctdb->tunable.queue_buffer_size;
476         /* In client code, ctdb->tunable is not initialized.
477          * This does not affect recovery daemon.
478          */
479         if (queue->buffer_size == 0) {
480                 queue->buffer_size = 1024;
481         }
482
483         queue->data_pool = talloc_pool(queue, queue->buffer_size);
484         if (queue->data_pool == NULL) {
485                 TALLOC_FREE(queue);
486                 return NULL;
487         }
488
489         return queue;
490 }