ctdb: Replace calculation of bytes to read from socket by MIN() macro
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/sys_rw.h"
34
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_buffer {
43         uint8_t *data;
44         uint32_t length;
45         uint32_t size;
46         uint32_t extend;
47 };
48
49 struct ctdb_queue_pkt {
50         struct ctdb_queue_pkt *next, *prev;
51         uint8_t *data;
52         uint32_t length;
53         uint32_t full_length;
54         uint8_t buf[];
55 };
56
57 struct ctdb_queue {
58         struct ctdb_context *ctdb;
59         struct tevent_immediate *im;
60         struct ctdb_buffer buffer; /* input buffer */
61         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
62         uint32_t out_queue_length;
63         struct tevent_fd *fde;
64         int fd;
65         size_t alignment;
66         void *private_data;
67         ctdb_queue_cb_fn_t callback;
68         const char *name;
69         uint32_t buffer_size;
70 };
71
72
73
74 int ctdb_queue_length(struct ctdb_queue *queue)
75 {
76         return queue->out_queue_length;
77 }
78
79 static void queue_process(struct ctdb_queue *queue);
80
81 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
82                                 void *private_data)
83 {
84         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
85
86         queue_process(queue);
87 }
88
89 /*
90  * This function is used to process data in queue buffer.
91  *
92  * Queue callback function can end up freeing the queue, there should not be a
93  * loop processing packets from queue buffer.  Instead set up a timed event for
94  * immediate run to process remaining packets from buffer.
95  */
96 static void queue_process(struct ctdb_queue *queue)
97 {
98         uint32_t pkt_size;
99         uint8_t *data;
100
101         if (queue->buffer.length < sizeof(pkt_size)) {
102                 return;
103         }
104
105         pkt_size = *(uint32_t *)queue->buffer.data;
106         if (pkt_size == 0) {
107                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
108                 goto failed;
109         }
110
111         if (queue->buffer.length < pkt_size) {
112                 if (pkt_size > queue->buffer_size) {
113                         queue->buffer.extend = pkt_size;
114                 }
115                 return;
116         }
117
118         /* Extract complete packet */
119         data = talloc_memdup(queue, queue->buffer.data, pkt_size);
120         if (data == NULL) {
121                 D_ERR("read error alloc failed for %u\n", pkt_size);
122                 return;
123         }
124
125         /* Shift packet out from buffer */
126         if (queue->buffer.length > pkt_size) {
127                 memmove(queue->buffer.data,
128                         queue->buffer.data + pkt_size,
129                         queue->buffer.length - pkt_size);
130         }
131         queue->buffer.length -= pkt_size;
132
133         if (queue->buffer.length > 0) {
134                 /* There is more data to be processed, schedule an event */
135                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
136                                           queue_process_event, queue);
137         } else {
138                 if (queue->buffer.size > queue->buffer_size) {
139                         TALLOC_FREE(queue->buffer.data);
140                         queue->buffer.size = 0;
141                 }
142         }
143
144         /* It is the responsibility of the callback to free 'data' */
145         queue->callback(data, pkt_size, queue->private_data);
146         return;
147
148 failed:
149         queue->callback(NULL, 0, queue->private_data);
150
151 }
152
153
154 /*
155   called when an incoming connection is readable
156   This function MUST be safe for reentry via the queue callback!
157 */
158 static void queue_io_read(struct ctdb_queue *queue)
159 {
160         int num_ready = 0;
161         ssize_t nread;
162         uint8_t *data;
163
164         /* check how much data is available on the socket for immediately
165            guaranteed nonblocking access.
166            as long as we are careful never to try to read more than this
167            we know all reads will be successful and will neither block
168            nor fail with a "data not available right now" error
169         */
170         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
171                 return;
172         }
173         if (num_ready == 0) {
174                 /* the descriptor has been closed */
175                 goto failed;
176         }
177
178         if (queue->buffer.data == NULL) {
179                 /* starting fresh, allocate buf to read data */
180                 queue->buffer.data = talloc_size(queue, queue->buffer_size);
181                 if (queue->buffer.data == NULL) {
182                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
183                         goto failed;
184                 }
185                 queue->buffer.size = queue->buffer_size;
186         } else if (queue->buffer.extend > 0) {
187                 /* extending buffer */
188                 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
189                 if (data == NULL) {
190                         DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
191                         goto failed;
192                 }
193                 queue->buffer.data = data;
194                 queue->buffer.size = queue->buffer.extend;
195                 queue->buffer.extend = 0;
196         }
197
198         num_ready = MIN(num_ready, queue->buffer.size - queue->buffer.length);
199
200         if (num_ready > 0) {
201                 nread = sys_read(queue->fd,
202                                  queue->buffer.data + queue->buffer.length,
203                                  num_ready);
204                 if (nread <= 0) {
205                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
206                         goto failed;
207                 }
208                 queue->buffer.length += nread;
209         }
210
211         queue_process(queue);
212         return;
213
214 failed:
215         queue->callback(NULL, 0, queue->private_data);
216 }
217
218
219 /* used when an event triggers a dead queue */
220 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
221                        void *private_data)
222 {
223         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
224         queue->callback(NULL, 0, queue->private_data);
225 }
226
227
228 /*
229   called when an incoming connection is writeable
230 */
231 static void queue_io_write(struct ctdb_queue *queue)
232 {
233         while (queue->out_queue) {
234                 struct ctdb_queue_pkt *pkt = queue->out_queue;
235                 ssize_t n;
236                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
237                         n = write(queue->fd, pkt->data, 1);
238                 } else {
239                         n = write(queue->fd, pkt->data, pkt->length);
240                 }
241
242                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
243                         if (pkt->length != pkt->full_length) {
244                                 /* partial packet sent - we have to drop it */
245                                 DLIST_REMOVE(queue->out_queue, pkt);
246                                 queue->out_queue_length--;
247                                 talloc_free(pkt);
248                         }
249                         talloc_free(queue->fde);
250                         queue->fde = NULL;
251                         queue->fd = -1;
252                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
253                                                   queue_dead, queue);
254                         return;
255                 }
256                 if (n <= 0) return;
257                 
258                 if (n != pkt->length) {
259                         pkt->length -= n;
260                         pkt->data += n;
261                         return;
262                 }
263
264                 DLIST_REMOVE(queue->out_queue, pkt);
265                 queue->out_queue_length--;
266                 talloc_free(pkt);
267         }
268
269         TEVENT_FD_NOT_WRITEABLE(queue->fde);
270 }
271
272 /*
273   called when an incoming connection is readable or writeable
274 */
275 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
276                              uint16_t flags, void *private_data)
277 {
278         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
279
280         if (flags & TEVENT_FD_READ) {
281                 queue_io_read(queue);
282         } else {
283                 queue_io_write(queue);
284         }
285 }
286
287
288 /*
289   queue a packet for sending
290 */
291 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
292 {
293         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
294         struct ctdb_queue_pkt *pkt;
295         uint32_t length2, full_length;
296
297         /* If the queue does not have valid fd, no point queueing a packet */
298         if (queue->fd == -1) {
299                 return 0;
300         }
301
302         if (queue->alignment) {
303                 /* enforce the length and alignment rules from the tcp packet allocator */
304                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
305                 *(uint32_t *)data = length2;
306         } else {
307                 length2 = length;
308         }
309
310         if (length2 != length) {
311                 memset(data+length, 0, length2-length);
312         }
313
314         full_length = length2;
315         
316         /* if the queue is empty then try an immediate write, avoiding
317            queue overhead. This relies on non-blocking sockets */
318         if (queue->out_queue == NULL && queue->fd != -1 &&
319             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
320                 ssize_t n = write(queue->fd, data, length2);
321                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
322                         talloc_free(queue->fde);
323                         queue->fde = NULL;
324                         queue->fd = -1;
325                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
326                                                   queue_dead, queue);
327                         /* yes, we report success, as the dead node is 
328                            handled via a separate event */
329                         return 0;
330                 }
331                 if (n > 0) {
332                         data += n;
333                         length2 -= n;
334                 }
335                 if (length2 == 0) return 0;
336         }
337
338         pkt = talloc_size(
339                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
340         CTDB_NO_MEMORY(queue->ctdb, pkt);
341         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
342
343         pkt->data = pkt->buf;
344         memcpy(pkt->data, data, length2);
345
346         pkt->length = length2;
347         pkt->full_length = full_length;
348
349         if (queue->out_queue == NULL && queue->fd != -1) {
350                 TEVENT_FD_WRITEABLE(queue->fde);
351         }
352
353         DLIST_ADD_END(queue->out_queue, pkt);
354
355         queue->out_queue_length++;
356
357         if (queue->ctdb->tunable.verbose_memory_names != 0) {
358                 switch (hdr->operation) {
359                 case CTDB_REQ_CONTROL: {
360                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
361                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
362                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
363                         break;
364                 }
365                 case CTDB_REQ_MESSAGE: {
366                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
367                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
368                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
369                         break;
370                 }
371                 default:
372                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
373                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
374                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
375                         break;
376                 }
377         }
378
379         return 0;
380 }
381
382
383 /*
384   setup the fd used by the queue
385  */
386 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
387 {
388         queue->fd = fd;
389         talloc_free(queue->fde);
390         queue->fde = NULL;
391
392         if (fd != -1) {
393                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
394                                            TEVENT_FD_READ,
395                                            queue_io_handler, queue);
396                 if (queue->fde == NULL) {
397                         return -1;
398                 }
399                 tevent_fd_set_auto_close(queue->fde);
400
401                 if (queue->out_queue) {
402                         TEVENT_FD_WRITEABLE(queue->fde);
403                 }
404         }
405
406         return 0;
407 }
408
409 /*
410   setup a packet queue on a socket
411  */
412 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
413                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
414                                     ctdb_queue_cb_fn_t callback,
415                                     void *private_data, const char *fmt, ...)
416 {
417         struct ctdb_queue *queue;
418         va_list ap;
419
420         queue = talloc_zero(mem_ctx, struct ctdb_queue);
421         CTDB_NO_MEMORY_NULL(ctdb, queue);
422         va_start(ap, fmt);
423         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
424         va_end(ap);
425         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
426
427         queue->im= tevent_create_immediate(queue);
428         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
429
430         queue->ctdb = ctdb;
431         queue->fd = fd;
432         queue->alignment = alignment;
433         queue->private_data = private_data;
434         queue->callback = callback;
435         if (fd != -1) {
436                 if (ctdb_queue_set_fd(queue, fd) != 0) {
437                         talloc_free(queue);
438                         return NULL;
439                 }
440         }
441
442         queue->buffer_size = ctdb->tunable.queue_buffer_size;
443         /* In client code, ctdb->tunable is not initialized.
444          * This does not affect recovery daemon.
445          */
446         if (queue->buffer_size == 0) {
447                 queue->buffer_size = 1024;
448         }
449
450         return queue;
451 }