Suggestion from Volker,
[amitay/samba.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb.h"
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue;
49         uint32_t out_queue_length;
50         struct fd_event *fde;
51         int fd;
52         size_t alignment;
53         void *private_data;
54         ctdb_queue_cb_fn_t callback;
55 };
56
57
58
59 int ctdb_queue_length(struct ctdb_queue *queue)
60 {
61         return queue->out_queue_length;
62 }
63
64 /*
65   called when an incoming connection is readable
66 */
67 static void queue_io_read(struct ctdb_queue *queue)
68 {
69         int num_ready = 0;
70         ssize_t nread;
71         uint8_t *data, *data_base;
72
73         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
74                 return;
75         }
76         if (num_ready == 0) {
77                 /* the descriptor has been closed */
78                 goto failed;
79         }
80
81
82         queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
83                                                   num_ready + queue->partial.length);
84
85         if (queue->partial.data == NULL) {
86                 DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", 
87                          num_ready + queue->partial.length));
88                 goto failed;
89         }
90
91         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
92         if (nread <= 0) {
93                 DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
94                 goto failed;
95         }
96
97
98         data = queue->partial.data;
99         nread += queue->partial.length;
100
101         queue->partial.data = NULL;
102         queue->partial.length = 0;
103
104         if (nread >= 4 && *(uint32_t *)data == nread) {
105                 /* it is the responsibility of the incoming packet
106                  function to free 'data' */
107                 queue->callback(data, nread, queue->private_data);
108                 return;
109         }
110
111         data_base = data;
112
113         while (nread >= 4 && *(uint32_t *)data <= nread) {
114                 /* we have at least one packet */
115                 uint8_t *d2;
116                 uint32_t len;
117                 len = *(uint32_t *)data;
118                 if (len == 0) {
119                         /* bad packet! treat as EOF */
120                         DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
121                         goto failed;
122                 }
123                 d2 = talloc_memdup(queue, data, len);
124                 if (d2 == NULL) {
125                         DEBUG(DEBUG_ERR,("read error memdup failed for %u\n", len));
126                         /* sigh */
127                         goto failed;
128                 }
129                 queue->callback(d2, len, queue->private_data);
130                 data += len;
131                 nread -= len;           
132         }
133
134         if (nread > 0) {
135                 /* we have only part of a packet */
136                 if (data_base == data) {
137                         queue->partial.data = data;
138                         queue->partial.length = nread;
139                 } else {
140                         queue->partial.data = talloc_memdup(queue, data, nread);
141                         if (queue->partial.data == NULL) {
142                                 DEBUG(DEBUG_ERR,("read error memdup partial failed for %u\n", 
143                                          (unsigned)nread));
144                                 goto failed;
145                         }
146                         queue->partial.length = nread;
147                         talloc_free(data_base);
148                 }
149                 return;
150         }
151
152         talloc_free(data_base);
153         return;
154
155 failed:
156         queue->callback(NULL, 0, queue->private_data);
157 }
158
159
160 /* used when an event triggers a dead queue */
161 static void queue_dead(struct event_context *ev, struct timed_event *te, 
162                        struct timeval t, void *private_data)
163 {
164         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
165         queue->callback(NULL, 0, queue->private_data);
166 }
167
168
169 /*
170   called when an incoming connection is writeable
171 */
172 static void queue_io_write(struct ctdb_queue *queue)
173 {
174         while (queue->out_queue) {
175                 struct ctdb_queue_pkt *pkt = queue->out_queue;
176                 ssize_t n;
177                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
178                         n = write(queue->fd, pkt->data, 1);
179                 } else {
180                         n = write(queue->fd, pkt->data, pkt->length);
181                 }
182
183                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
184                         if (pkt->length != pkt->full_length) {
185                                 /* partial packet sent - we have to drop it */
186                                 DLIST_REMOVE(queue->out_queue, pkt);
187                                 queue->out_queue_length--;
188                                 talloc_free(pkt);
189                         }
190                         talloc_free(queue->fde);
191                         queue->fde = NULL;
192                         queue->fd = -1;
193                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
194                                         queue_dead, queue);
195                         return;
196                 }
197                 if (n <= 0) return;
198                 
199                 if (n != pkt->length) {
200                         pkt->length -= n;
201                         pkt->data += n;
202                         return;
203                 }
204
205                 DLIST_REMOVE(queue->out_queue, pkt);
206                 queue->out_queue_length--;
207                 talloc_free(pkt);
208         }
209
210         EVENT_FD_NOT_WRITEABLE(queue->fde);
211 }
212
213 /*
214   called when an incoming connection is readable or writeable
215 */
216 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
217                              uint16_t flags, void *private_data)
218 {
219         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
220
221         if (flags & EVENT_FD_READ) {
222                 queue_io_read(queue);
223         } else {
224                 queue_io_write(queue);
225         }
226 }
227
228
229 /*
230   queue a packet for sending
231 */
232 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
233 {
234         struct ctdb_queue_pkt *pkt;
235         uint32_t length2, full_length;
236
237         if (queue->alignment) {
238                 /* enforce the length and alignment rules from the tcp packet allocator */
239                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
240                 *(uint32_t *)data = length2;
241         } else {
242                 length2 = length;
243         }
244
245         if (length2 != length) {
246                 memset(data+length, 0, length2-length);
247         }
248
249         full_length = length2;
250         
251         /* if the queue is empty then try an immediate write, avoiding
252            queue overhead. This relies on non-blocking sockets */
253         if (queue->out_queue == NULL && queue->fd != -1 &&
254             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
255                 ssize_t n = write(queue->fd, data, length2);
256                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
257                         talloc_free(queue->fde);
258                         queue->fde = NULL;
259                         queue->fd = -1;
260                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
261                                         queue_dead, queue);
262                         /* yes, we report success, as the dead node is 
263                            handled via a separate event */
264                         return 0;
265                 }
266                 if (n > 0) {
267                         data += n;
268                         length2 -= n;
269                 }
270                 if (length2 == 0) return 0;
271         }
272
273         pkt = talloc(queue, struct ctdb_queue_pkt);
274         CTDB_NO_MEMORY(queue->ctdb, pkt);
275
276         pkt->data = talloc_memdup(pkt, data, length2);
277         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
278
279         pkt->length = length2;
280         pkt->full_length = full_length;
281
282         if (queue->out_queue == NULL && queue->fd != -1) {
283                 EVENT_FD_WRITEABLE(queue->fde);
284         }
285
286         DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
287         queue->out_queue_length++;
288
289         if (queue->ctdb->tunable.verbose_memory_names != 0) {
290                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
291                 switch (hdr->operation) {
292                 case CTDB_REQ_CONTROL: {
293                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
294                         talloc_set_name(pkt, "ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u",
295                                         (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
296                         break;
297                 }
298                 case CTDB_REQ_MESSAGE: {
299                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
300                         talloc_set_name(pkt, "ctdb_queue_pkt: message srvid=%llu datalen=%u",
301                                         (unsigned long long)m->srvid, (unsigned)m->datalen);
302                         break;
303                 }
304                 default:
305                         talloc_set_name(pkt, "ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u",
306                                         (unsigned)hdr->operation, (unsigned)hdr->length, 
307                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
308                         break;
309                 }
310         }
311
312         return 0;
313 }
314
315
316 /*
317   setup the fd used by the queue
318  */
319 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
320 {
321         queue->fd = fd;
322         talloc_free(queue->fde);
323         queue->fde = NULL;
324
325         if (fd != -1) {
326                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
327                                           queue_io_handler, queue);
328                 if (queue->fde == NULL) {
329                         return -1;
330                 }
331
332                 if (queue->out_queue) {
333                         EVENT_FD_WRITEABLE(queue->fde);         
334                 }
335         }
336
337         return 0;
338 }
339
340
341
342 /*
343   setup a packet queue on a socket
344  */
345 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
346                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
347                                     
348                                     ctdb_queue_cb_fn_t callback,
349                                     void *private_data)
350 {
351         struct ctdb_queue *queue;
352
353         queue = talloc_zero(mem_ctx, struct ctdb_queue);
354         CTDB_NO_MEMORY_NULL(ctdb, queue);
355
356         queue->ctdb = ctdb;
357         queue->fd = fd;
358         queue->alignment = alignment;
359         queue->private_data = private_data;
360         queue->callback = callback;
361         if (fd != -1) {
362                 if (ctdb_queue_set_fd(queue, fd) != 0) {
363                         talloc_free(queue);
364                         return NULL;
365                 }
366         }
367
368         return queue;
369 }