event: Update events to latest Samba version 0.9.8
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/tevent/tevent.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb_client.h"
31 #include <stdarg.h>
32
33 /* structures for packet queueing - see common/ctdb_io.c */
34 struct ctdb_partial {
35         uint8_t *data;
36         uint32_t length;
37 };
38
39 struct ctdb_queue_pkt {
40         struct ctdb_queue_pkt *next, *prev;
41         uint8_t *data;
42         uint32_t length;
43         uint32_t full_length;
44 };
45
46 struct ctdb_queue {
47         struct ctdb_context *ctdb;
48         struct ctdb_partial partial; /* partial input packet */
49         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
50         uint32_t out_queue_length;
51         struct fd_event *fde;
52         int fd;
53         size_t alignment;
54         void *private_data;
55         ctdb_queue_cb_fn_t callback;
56         bool *destroyed;
57         const char *name;
58 };
59
60
61
62 int ctdb_queue_length(struct ctdb_queue *queue)
63 {
64         return queue->out_queue_length;
65 }
66
67 static void dump_packet(unsigned char *data, size_t len)
68 {
69         size_t i;
70         char *p = talloc_array(NULL, char, len*3 + 1);
71         if (!p) {
72                 DEBUG(DEBUG_CRIT,("Packet talloc fail"));
73                 return;
74         }
75
76         for (i = 0; i < len; i++)
77                 sprintf(p + i*3, " %02x", data[i]);
78         DEBUG(DEBUG_CRIT,("Contents: %s\n", p));
79         talloc_free(p);
80 }
81
82 /*
83   called when an incoming connection is readable
84 */
85 static void queue_io_read(struct ctdb_queue *queue)
86 {
87         int num_ready = 0;
88         ssize_t nread, totread, partlen;
89         uint8_t *data, *data_base;
90
91         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
92                 return;
93         }
94         if (num_ready == 0) {
95                 /* the descriptor has been closed */
96                 goto failed;
97         }
98
99
100         queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
101                                                   num_ready + queue->partial.length);
102
103         if (queue->partial.data == NULL) {
104                 DEBUG(DEBUG_ERR,("%s: read error alloc failed for %u\n",
105                         queue->name, num_ready + queue->partial.length));
106                 goto failed;
107         }
108
109         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
110         if (nread <= 0) {
111                 DEBUG(DEBUG_ERR,("%s: read error nread=%d\n",
112                                  queue->name, (int)nread));
113                 goto failed;
114         }
115         totread = nread;
116         partlen = queue->partial.length;
117
118         data = queue->partial.data;
119         nread += queue->partial.length;
120
121         queue->partial.data = NULL;
122         queue->partial.length = 0;
123
124         if (nread >= 4 && *(uint32_t *)data == nread) {
125                 /* it is the responsibility of the incoming packet
126                  function to free 'data' */
127                 queue->callback(data, nread, queue->private_data);
128                 return;
129         }
130
131         data_base = data;
132
133         while (nread >= 4 && *(uint32_t *)data <= nread) {
134                 /* we have at least one packet */
135                 uint8_t *d2;
136                 uint32_t len;
137                 bool destroyed = false;
138
139                 len = *(uint32_t *)data;
140                 if (len == 0) {
141                         /* bad packet! treat as EOF */
142                         DEBUG(DEBUG_CRIT,("%s: Invalid packet of length 0 (nread = %zu, totread = %zu, partlen = %zu)\n",
143                                           queue->name, nread, totread, partlen));
144                         dump_packet(data_base, totread + partlen);
145                         goto failed;
146                 }
147                 d2 = talloc_memdup(queue, data, len);
148                 if (d2 == NULL) {
149                         DEBUG(DEBUG_ERR,("%s: read error memdup failed for %u\n",
150                                          queue->name, len));
151                         /* sigh */
152                         goto failed;
153                 }
154
155                 queue->destroyed = &destroyed;
156                 queue->callback(d2, len, queue->private_data);
157                 /* If callback freed us, don't do anything else. */
158                 if (destroyed) {
159                         return;
160                 }
161                 queue->destroyed = NULL;
162
163                 data += len;
164                 nread -= len;           
165         }
166
167         if (nread > 0) {
168                 /* we have only part of a packet */
169                 if (data_base == data) {
170                         queue->partial.data = data;
171                         queue->partial.length = nread;
172                 } else {
173                         queue->partial.data = talloc_memdup(queue, data, nread);
174                         if (queue->partial.data == NULL) {
175                                 DEBUG(DEBUG_ERR,("%s: read error memdup partial failed for %u\n",
176                                                  queue->name, (unsigned)nread));
177                                 goto failed;
178                         }
179                         queue->partial.length = nread;
180                         talloc_free(data_base);
181                 }
182                 return;
183         }
184
185         talloc_free(data_base);
186         return;
187
188 failed:
189         queue->callback(NULL, 0, queue->private_data);
190 }
191
192
193 /* used when an event triggers a dead queue */
194 static void queue_dead(struct event_context *ev, struct timed_event *te, 
195                        struct timeval t, void *private_data)
196 {
197         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
198         queue->callback(NULL, 0, queue->private_data);
199 }
200
201
202 /*
203   called when an incoming connection is writeable
204 */
205 static void queue_io_write(struct ctdb_queue *queue)
206 {
207         while (queue->out_queue) {
208                 struct ctdb_queue_pkt *pkt = queue->out_queue;
209                 ssize_t n;
210                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
211                         n = write(queue->fd, pkt->data, 1);
212                 } else {
213                         n = write(queue->fd, pkt->data, pkt->length);
214                 }
215
216                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
217                         if (pkt->length != pkt->full_length) {
218                                 /* partial packet sent - we have to drop it */
219                                 DLIST_REMOVE(queue->out_queue, pkt);
220                                 queue->out_queue_length--;
221                                 talloc_free(pkt);
222                         }
223                         talloc_free(queue->fde);
224                         queue->fde = NULL;
225                         queue->fd = -1;
226                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
227                                         queue_dead, queue);
228                         return;
229                 }
230                 if (n <= 0) return;
231                 
232                 if (n != pkt->length) {
233                         pkt->length -= n;
234                         pkt->data += n;
235                         return;
236                 }
237
238                 DLIST_REMOVE(queue->out_queue, pkt);
239                 queue->out_queue_length--;
240                 talloc_free(pkt);
241         }
242
243         EVENT_FD_NOT_WRITEABLE(queue->fde);
244 }
245
246 /*
247   called when an incoming connection is readable or writeable
248 */
249 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
250                              uint16_t flags, void *private_data)
251 {
252         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
253
254         if (flags & EVENT_FD_READ) {
255                 queue_io_read(queue);
256         } else {
257                 queue_io_write(queue);
258         }
259 }
260
261
262 /*
263   queue a packet for sending
264 */
265 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
266 {
267         struct ctdb_queue_pkt *pkt;
268         uint32_t length2, full_length;
269
270         if (queue->alignment) {
271                 /* enforce the length and alignment rules from the tcp packet allocator */
272                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
273                 *(uint32_t *)data = length2;
274         } else {
275                 length2 = length;
276         }
277
278         if (length2 != length) {
279                 memset(data+length, 0, length2-length);
280         }
281
282         full_length = length2;
283         
284         /* if the queue is empty then try an immediate write, avoiding
285            queue overhead. This relies on non-blocking sockets */
286         if (queue->out_queue == NULL && queue->fd != -1 &&
287             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
288                 ssize_t n = write(queue->fd, data, length2);
289                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
290                         talloc_free(queue->fde);
291                         queue->fde = NULL;
292                         queue->fd = -1;
293                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
294                                         queue_dead, queue);
295                         /* yes, we report success, as the dead node is 
296                            handled via a separate event */
297                         return 0;
298                 }
299                 if (n > 0) {
300                         data += n;
301                         length2 -= n;
302                 }
303                 if (length2 == 0) return 0;
304         }
305
306         pkt = talloc(queue, struct ctdb_queue_pkt);
307         CTDB_NO_MEMORY(queue->ctdb, pkt);
308
309         pkt->data = talloc_memdup(pkt, data, length2);
310         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
311
312         pkt->length = length2;
313         pkt->full_length = full_length;
314
315         if (queue->out_queue == NULL && queue->fd != -1) {
316                 EVENT_FD_WRITEABLE(queue->fde);
317         }
318
319         DLIST_ADD_END(queue->out_queue, pkt, NULL);
320
321         queue->out_queue_length++;
322
323         if (queue->ctdb->tunable.verbose_memory_names != 0) {
324                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
325                 switch (hdr->operation) {
326                 case CTDB_REQ_CONTROL: {
327                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
328                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
329                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
330                         break;
331                 }
332                 case CTDB_REQ_MESSAGE: {
333                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
334                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
335                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
336                         break;
337                 }
338                 default:
339                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
340                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
341                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
342                         break;
343                 }
344         }
345
346         return 0;
347 }
348
349
350 /*
351   setup the fd used by the queue
352  */
353 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
354 {
355         queue->fd = fd;
356         talloc_free(queue->fde);
357         queue->fde = NULL;
358
359         if (fd != -1) {
360                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
361                                           queue_io_handler, queue);
362                 if (queue->fde == NULL) {
363                         return -1;
364                 }
365                 tevent_fd_set_auto_close(queue->fde);
366
367                 if (queue->out_queue) {
368                         EVENT_FD_WRITEABLE(queue->fde);         
369                 }
370         }
371
372         return 0;
373 }
374
375 /* If someone sets up this pointer, they want to know if the queue is freed */
376 static int queue_destructor(struct ctdb_queue *queue)
377 {
378         if (queue->destroyed != NULL)
379                 *queue->destroyed = true;
380         return 0;
381 }
382
383 /*
384   setup a packet queue on a socket
385  */
386 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
387                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
388                                     
389                                     ctdb_queue_cb_fn_t callback,
390                                     void *private_data, const char *fmt, ...)
391 {
392         struct ctdb_queue *queue;
393         va_list ap;
394
395         queue = talloc_zero(mem_ctx, struct ctdb_queue);
396         CTDB_NO_MEMORY_NULL(ctdb, queue);
397         va_start(ap, fmt);
398         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
399         va_end(ap);
400         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
401
402         queue->ctdb = ctdb;
403         queue->fd = fd;
404         queue->alignment = alignment;
405         queue->private_data = private_data;
406         queue->callback = callback;
407         if (fd != -1) {
408                 if (ctdb_queue_set_fd(queue, fd) != 0) {
409                         talloc_free(queue);
410                         return NULL;
411                 }
412         }
413         talloc_set_destructor(queue, queue_destructor);
414
415         return queue;
416 }