merged from ronnie
[sahlberg/ctdb.git] / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This library is free software; you can redistribute it and/or
10    modify it under the terms of the GNU Lesser General Public
11    License as published by the Free Software Foundation; either
12    version 2 of the License, or (at your option) any later version.
13
14    This library is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17    Lesser General Public License for more details.
18
19    You should have received a copy of the GNU Lesser General Public
20    License along with this library; if not, write to the Free Software
21    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 */
23
24 #include "includes.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "lib/events/events.h"
27 #include "lib/util/dlinklist.h"
28 #include "system/network.h"
29 #include "system/filesys.h"
30 #include "../include/ctdb_private.h"
31 #include "../include/ctdb.h"
32
33 /* structures for packet queueing - see common/ctdb_io.c */
34 struct ctdb_partial {
35         uint8_t *data;
36         uint32_t length;
37 };
38
39 struct ctdb_queue_pkt {
40         struct ctdb_queue_pkt *next, *prev;
41         uint8_t *data;
42         uint32_t length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue;
49         struct fd_event *fde;
50         int fd;
51         size_t alignment;
52         void *private_data;
53         ctdb_queue_cb_fn_t callback;
54 };
55
56
57
58 /*
59   called when an incoming connection is readable
60 */
61 static void queue_io_read(struct ctdb_queue *queue)
62 {
63         int num_ready = 0;
64         ssize_t nread;
65         uint8_t *data, *data_base;
66
67         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
68                 return;
69         }
70         if (num_ready == 0) {
71                 /* the descriptor has been closed */
72                 goto failed;
73         }
74
75
76         queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
77                                                   num_ready + queue->partial.length);
78
79         if (queue->partial.data == NULL) {
80                 DEBUG(0,("read error alloc failed for %u\n", 
81                          num_ready + queue->partial.length));
82                 goto failed;
83         }
84
85         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
86         if (nread <= 0) {
87                 DEBUG(0,("read error nread=%d\n", nread));
88                 goto failed;
89         }
90
91
92         data = queue->partial.data;
93         nread += queue->partial.length;
94
95         queue->partial.data = NULL;
96         queue->partial.length = 0;
97
98         if (nread >= 4 && *(uint32_t *)data == nread) {
99                 /* it is the responsibility of the incoming packet
100                  function to free 'data' */
101                 queue->callback(data, nread, queue->private_data);
102                 return;
103         }
104
105         data_base = data;
106
107         while (nread >= 4 && *(uint32_t *)data <= nread) {
108                 /* we have at least one packet */
109                 uint8_t *d2;
110                 uint32_t len;
111                 len = *(uint32_t *)data;
112                 d2 = talloc_memdup(queue, data, len);
113                 if (d2 == NULL) {
114                         DEBUG(0,("read error memdup failed for %u\n", len));
115                         /* sigh */
116                         goto failed;
117                 }
118                 queue->callback(d2, len, queue->private_data);
119                 data += len;
120                 nread -= len;           
121         }
122
123         if (nread > 0) {
124                 /* we have only part of a packet */
125                 if (data_base == data) {
126                         queue->partial.data = data;
127                         queue->partial.length = nread;
128                 } else {
129                         queue->partial.data = talloc_memdup(queue, data, nread);
130                         if (queue->partial.data == NULL) {
131                                 DEBUG(0,("read error memdup partial failed for %u\n", 
132                                          nread));
133                                 goto failed;
134                         }
135                         queue->partial.length = nread;
136                         talloc_free(data_base);
137                 }
138                 return;
139         }
140
141         talloc_free(data_base);
142         return;
143
144 failed:
145         queue->callback(NULL, 0, queue->private_data);
146 }
147
148
149 /* used when an event triggers a dead queue */
150 static void queue_dead(struct event_context *ev, struct timed_event *te, 
151                        struct timeval t, void *private_data)
152 {
153         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
154         queue->callback(NULL, 0, queue->private_data);
155 }
156
157
158 /*
159   called when an incoming connection is writeable
160 */
161 static void queue_io_write(struct ctdb_queue *queue)
162 {
163         while (queue->out_queue) {
164                 struct ctdb_queue_pkt *pkt = queue->out_queue;
165                 ssize_t n;
166                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
167                         n = write(queue->fd, pkt->data, 1);
168                 } else {
169                         n = write(queue->fd, pkt->data, pkt->length);
170                 }
171
172                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
173                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
174                                         queue_dead, queue);
175                         EVENT_FD_NOT_WRITEABLE(queue->fde);
176                         return;
177                 }
178                 if (n <= 0) return;
179                 
180                 if (n != pkt->length) {
181                         pkt->length -= n;
182                         pkt->data += n;
183                         return;
184                 }
185
186                 DLIST_REMOVE(queue->out_queue, pkt);
187                 talloc_free(pkt);
188         }
189
190         EVENT_FD_NOT_WRITEABLE(queue->fde);
191 }
192
193 /*
194   called when an incoming connection is readable or writeable
195 */
196 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
197                              uint16_t flags, void *private_data)
198 {
199         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
200
201         if (flags & EVENT_FD_READ) {
202                 queue_io_read(queue);
203         } else {
204                 queue_io_write(queue);
205         }
206 }
207
208
209 /*
210   queue a packet for sending
211 */
212 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
213 {
214         struct ctdb_queue_pkt *pkt;
215         uint32_t length2;
216
217         if (queue->alignment) {
218                 /* enforce the length and alignment rules from the tcp packet allocator */
219                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
220                 *(uint32_t *)data = length2;
221         } else {
222                 length2 = length;
223         }
224
225         if (length2 != length) {
226                 memset(data+length, 0, length2-length);
227         }
228         
229         /* if the queue is empty then try an immediate write, avoiding
230            queue overhead. This relies on non-blocking sockets */
231         if (queue->out_queue == NULL && queue->fd != -1 &&
232             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
233                 ssize_t n = write(queue->fd, data, length2);
234                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
235                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
236                                         queue_dead, queue);
237                         /* yes, we report success, as the dead node is 
238                            handled via a separate event */
239                         return 0;
240                 }
241                 if (n > 0) {
242                         data += n;
243                         length2 -= n;
244                 }
245                 if (length2 == 0) return 0;
246         }
247
248         pkt = talloc(queue, struct ctdb_queue_pkt);
249         CTDB_NO_MEMORY(queue->ctdb, pkt);
250
251         pkt->data = talloc_memdup(pkt, data, length2);
252         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
253
254         pkt->length = length2;
255
256         if (queue->out_queue == NULL && queue->fd != -1) {
257                 EVENT_FD_WRITEABLE(queue->fde);
258         }
259
260         DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
261
262         return 0;
263 }
264
265
266 /*
267   setup the fd used by the queue
268  */
269 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
270 {
271         queue->fd = fd;
272         talloc_free(queue->fde);
273         queue->fde = NULL;
274
275         if (fd != -1) {
276                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, 
277                                           queue_io_handler, queue);
278                 if (queue->fde == NULL) {
279                         return -1;
280                 }
281
282                 if (queue->out_queue) {
283                         EVENT_FD_WRITEABLE(queue->fde);         
284                 }
285         }
286
287         return 0;
288 }
289
290
291
292 /*
293   setup a packet queue on a socket
294  */
295 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
296                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
297                                     
298                                     ctdb_queue_cb_fn_t callback,
299                                     void *private_data)
300 {
301         struct ctdb_queue *queue;
302
303         queue = talloc_zero(mem_ctx, struct ctdb_queue);
304         CTDB_NO_MEMORY_NULL(ctdb, queue);
305
306         queue->ctdb = ctdb;
307         queue->fd = fd;
308         queue->alignment = alignment;
309         queue->private_data = private_data;
310         queue->callback = callback;
311         if (fd != -1) {
312                 if (ctdb_queue_set_fd(queue, fd) != 0) {
313                         talloc_free(queue);
314                         return NULL;
315                 }
316         }
317
318         return queue;
319 }