update TAKEIP/RELEASEIP/GETPUBLICIP/GETNODEMAP controls so we retain an
[metze/ctdb/wip.git] / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb.h"
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue;
49         struct fd_event *fde;
50         int fd;
51         size_t alignment;
52         void *private_data;
53         ctdb_queue_cb_fn_t callback;
54 };
55
56
57
58 /*
59   called when an incoming connection is readable
60 */
61 static void queue_io_read(struct ctdb_queue *queue)
62 {
63         int num_ready = 0;
64         ssize_t nread;
65         uint8_t *data, *data_base;
66
67         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
68                 return;
69         }
70         if (num_ready == 0) {
71                 /* the descriptor has been closed */
72                 goto failed;
73         }
74
75
76         queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
77                                                   num_ready + queue->partial.length);
78
79         if (queue->partial.data == NULL) {
80                 DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", 
81                          num_ready + queue->partial.length));
82                 goto failed;
83         }
84
85         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
86         if (nread <= 0) {
87                 DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
88                 goto failed;
89         }
90
91
92         data = queue->partial.data;
93         nread += queue->partial.length;
94
95         queue->partial.data = NULL;
96         queue->partial.length = 0;
97
98         if (nread >= 4 && *(uint32_t *)data == nread) {
99                 /* it is the responsibility of the incoming packet
100                  function to free 'data' */
101                 queue->callback(data, nread, queue->private_data);
102                 return;
103         }
104
105         data_base = data;
106
107         while (nread >= 4 && *(uint32_t *)data <= nread) {
108                 /* we have at least one packet */
109                 uint8_t *d2;
110                 uint32_t len;
111                 len = *(uint32_t *)data;
112                 if (len == 0) {
113                         /* bad packet! treat as EOF */
114                         DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
115                         goto failed;
116                 }
117                 d2 = talloc_memdup(queue, data, len);
118                 if (d2 == NULL) {
119                         DEBUG(DEBUG_ERR,("read error memdup failed for %u\n", len));
120                         /* sigh */
121                         goto failed;
122                 }
123                 queue->callback(d2, len, queue->private_data);
124                 data += len;
125                 nread -= len;           
126         }
127
128         if (nread > 0) {
129                 /* we have only part of a packet */
130                 if (data_base == data) {
131                         queue->partial.data = data;
132                         queue->partial.length = nread;
133                 } else {
134                         queue->partial.data = talloc_memdup(queue, data, nread);
135                         if (queue->partial.data == NULL) {
136                                 DEBUG(DEBUG_ERR,("read error memdup partial failed for %u\n", 
137                                          (unsigned)nread));
138                                 goto failed;
139                         }
140                         queue->partial.length = nread;
141                         talloc_free(data_base);
142                 }
143                 return;
144         }
145
146         talloc_free(data_base);
147         return;
148
149 failed:
150         queue->callback(NULL, 0, queue->private_data);
151 }
152
153
154 /* used when an event triggers a dead queue */
155 static void queue_dead(struct event_context *ev, struct timed_event *te, 
156                        struct timeval t, void *private_data)
157 {
158         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
159         queue->callback(NULL, 0, queue->private_data);
160 }
161
162
163 /*
164   called when an incoming connection is writeable
165 */
166 static void queue_io_write(struct ctdb_queue *queue)
167 {
168         while (queue->out_queue) {
169                 struct ctdb_queue_pkt *pkt = queue->out_queue;
170                 ssize_t n;
171                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
172                         n = write(queue->fd, pkt->data, 1);
173                 } else {
174                         n = write(queue->fd, pkt->data, pkt->length);
175                 }
176
177                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
178                         if (pkt->length != pkt->full_length) {
179                                 /* partial packet sent - we have to drop it */
180                                 DLIST_REMOVE(queue->out_queue, pkt);
181                                 talloc_free(pkt);
182                         }
183                         talloc_free(queue->fde);
184                         queue->fde = NULL;
185                         queue->fd = -1;
186                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
187                                         queue_dead, queue);
188                         return;
189                 }
190                 if (n <= 0) return;
191                 
192                 if (n != pkt->length) {
193                         pkt->length -= n;
194                         pkt->data += n;
195                         return;
196                 }
197
198                 DLIST_REMOVE(queue->out_queue, pkt);
199                 talloc_free(pkt);
200         }
201
202         EVENT_FD_NOT_WRITEABLE(queue->fde);
203 }
204
205 /*
206   called when an incoming connection is readable or writeable
207 */
208 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
209                              uint16_t flags, void *private_data)
210 {
211         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
212
213         if (flags & EVENT_FD_READ) {
214                 queue_io_read(queue);
215         } else {
216                 queue_io_write(queue);
217         }
218 }
219
220
221 /*
222   queue a packet for sending
223 */
224 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
225 {
226         struct ctdb_queue_pkt *pkt;
227         uint32_t length2, full_length;
228
229         if (queue->alignment) {
230                 /* enforce the length and alignment rules from the tcp packet allocator */
231                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
232                 *(uint32_t *)data = length2;
233         } else {
234                 length2 = length;
235         }
236
237         if (length2 != length) {
238                 memset(data+length, 0, length2-length);
239         }
240
241         full_length = length2;
242         
243         /* if the queue is empty then try an immediate write, avoiding
244            queue overhead. This relies on non-blocking sockets */
245         if (queue->out_queue == NULL && queue->fd != -1 &&
246             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
247                 ssize_t n = write(queue->fd, data, length2);
248                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
249                         talloc_free(queue->fde);
250                         queue->fde = NULL;
251                         queue->fd = -1;
252                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
253                                         queue_dead, queue);
254                         /* yes, we report success, as the dead node is 
255                            handled via a separate event */
256                         return 0;
257                 }
258                 if (n > 0) {
259                         data += n;
260                         length2 -= n;
261                 }
262                 if (length2 == 0) return 0;
263         }
264
265         pkt = talloc(queue, struct ctdb_queue_pkt);
266         CTDB_NO_MEMORY(queue->ctdb, pkt);
267
268         pkt->data = talloc_memdup(pkt, data, length2);
269         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
270
271         pkt->length = length2;
272         pkt->full_length = full_length;
273
274         if (queue->out_queue == NULL && queue->fd != -1) {
275                 EVENT_FD_WRITEABLE(queue->fde);
276         }
277
278         DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
279
280         if (queue->ctdb->tunable.verbose_memory_names != 0) {
281                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
282                 switch (hdr->operation) {
283                 case CTDB_REQ_CONTROL: {
284                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
285                         talloc_set_name(pkt, "ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u",
286                                         (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
287                         break;
288                 }
289                 case CTDB_REQ_MESSAGE: {
290                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
291                         talloc_set_name(pkt, "ctdb_queue_pkt: message srvid=%llu datalen=%u",
292                                         (unsigned long long)m->srvid, (unsigned)m->datalen);
293                         break;
294                 }
295                 default:
296                         talloc_set_name(pkt, "ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u",
297                                         (unsigned)hdr->operation, (unsigned)hdr->length, 
298                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
299                         break;
300                 }
301         }
302
303         return 0;
304 }
305
306
307 /*
308   setup the fd used by the queue
309  */
310 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
311 {
312         queue->fd = fd;
313         talloc_free(queue->fde);
314         queue->fde = NULL;
315
316         if (fd != -1) {
317                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
318                                           queue_io_handler, queue);
319                 if (queue->fde == NULL) {
320                         return -1;
321                 }
322
323                 if (queue->out_queue) {
324                         EVENT_FD_WRITEABLE(queue->fde);         
325                 }
326         }
327
328         return 0;
329 }
330
331
332
333 /*
334   setup a packet queue on a socket
335  */
336 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
337                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
338                                     
339                                     ctdb_queue_cb_fn_t callback,
340                                     void *private_data)
341 {
342         struct ctdb_queue *queue;
343
344         queue = talloc_zero(mem_ctx, struct ctdb_queue);
345         CTDB_NO_MEMORY_NULL(ctdb, queue);
346
347         queue->ctdb = ctdb;
348         queue->fd = fd;
349         queue->alignment = alignment;
350         queue->private_data = private_data;
351         queue->callback = callback;
352         if (fd != -1) {
353                 if (ctdb_queue_set_fd(queue, fd) != 0) {
354                         talloc_free(queue);
355                         return NULL;
356                 }
357         }
358
359         return queue;
360 }