Merge branch 'master' of ssh://git.samba.org/data/git/samba into regsrv
[ab/samba.git/.git] / source4 / cluster / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "../tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "../lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb.h"
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue;
49         struct fd_event *fde;
50         int fd;
51         size_t alignment;
52         void *private_data;
53         ctdb_queue_cb_fn_t callback;
54 };
55
56
57
58 /*
59   called when an incoming connection is readable
60 */
61 static void queue_io_read(struct ctdb_queue *queue)
62 {
63         int num_ready = 0;
64         ssize_t nread;
65         uint8_t *data, *data_base;
66
67         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
68                 return;
69         }
70         if (num_ready == 0) {
71                 /* the descriptor has been closed */
72                 goto failed;
73         }
74
75
76         queue->partial.data = talloc_realloc(queue, queue->partial.data, 
77                                              uint8_t, 
78                                              num_ready + queue->partial.length);
79
80         if (queue->partial.data == NULL) {
81                 DEBUG(0,("read error alloc failed for %u\n", 
82                          num_ready + queue->partial.length));
83                 goto failed;
84         }
85
86         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
87         if (nread <= 0) {
88                 DEBUG(0,("read error nread=%d\n", (int)nread));
89                 goto failed;
90         }
91
92
93         data = queue->partial.data;
94         nread += queue->partial.length;
95
96         queue->partial.data = NULL;
97         queue->partial.length = 0;
98
99         if (nread >= 4 && *(uint32_t *)data == nread) {
100                 /* it is the responsibility of the incoming packet
101                  function to free 'data' */
102                 queue->callback(data, nread, queue->private_data);
103                 return;
104         }
105
106         data_base = data;
107
108         while (nread >= 4 && *(uint32_t *)data <= nread) {
109                 /* we have at least one packet */
110                 uint8_t *d2;
111                 uint32_t len;
112                 len = *(uint32_t *)data;
113                 if (len == 0) {
114                         /* bad packet! treat as EOF */
115                         DEBUG(0,("Invalid packet of length 0\n"));
116                         goto failed;
117                 }
118                 d2 = (uint8_t *)talloc_memdup(queue, data, len);
119                 if (d2 == NULL) {
120                         DEBUG(0,("read error memdup failed for %u\n", len));
121                         /* sigh */
122                         goto failed;
123                 }
124                 queue->callback(d2, len, queue->private_data);
125                 data += len;
126                 nread -= len;           
127         }
128
129         if (nread > 0) {
130                 /* we have only part of a packet */
131                 if (data_base == data) {
132                         queue->partial.data = data;
133                         queue->partial.length = nread;
134                 } else {
135                         queue->partial.data = (uint8_t *)talloc_memdup(queue, data, nread);
136                         if (queue->partial.data == NULL) {
137                                 DEBUG(0,("read error memdup partial failed for %u\n", 
138                                          (unsigned)nread));
139                                 goto failed;
140                         }
141                         queue->partial.length = nread;
142                         talloc_free(data_base);
143                 }
144                 return;
145         }
146
147         talloc_free(data_base);
148         return;
149
150 failed:
151         queue->callback(NULL, 0, queue->private_data);
152 }
153
154
155 /* used when an event triggers a dead queue */
156 static void queue_dead(struct event_context *ev, struct timed_event *te, 
157                        struct timeval t, void *private_data)
158 {
159         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
160         queue->callback(NULL, 0, queue->private_data);
161 }
162
163
164 /*
165   called when an incoming connection is writeable
166 */
167 static void queue_io_write(struct ctdb_queue *queue)
168 {
169         while (queue->out_queue) {
170                 struct ctdb_queue_pkt *pkt = queue->out_queue;
171                 ssize_t n;
172                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
173                         n = write(queue->fd, pkt->data, 1);
174                 } else {
175                         n = write(queue->fd, pkt->data, pkt->length);
176                 }
177
178                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
179                         if (pkt->length != pkt->full_length) {
180                                 /* partial packet sent - we have to drop it */
181                                 DLIST_REMOVE(queue->out_queue, pkt);
182                                 talloc_free(pkt);
183                         }
184                         talloc_free(queue->fde);
185                         queue->fde = NULL;
186                         queue->fd = -1;
187                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
188                                         queue_dead, queue);
189                         return;
190                 }
191                 if (n <= 0) return;
192                 
193                 if (n != pkt->length) {
194                         pkt->length -= n;
195                         pkt->data += n;
196                         return;
197                 }
198
199                 DLIST_REMOVE(queue->out_queue, pkt);
200                 talloc_free(pkt);
201         }
202
203         EVENT_FD_NOT_WRITEABLE(queue->fde);
204 }
205
206 /*
207   called when an incoming connection is readable or writeable
208 */
209 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
210                              uint16_t flags, void *private_data)
211 {
212         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
213
214         if (flags & EVENT_FD_READ) {
215                 queue_io_read(queue);
216         } else {
217                 queue_io_write(queue);
218         }
219 }
220
221
222 /*
223   queue a packet for sending
224 */
225 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
226 {
227         struct ctdb_queue_pkt *pkt;
228         uint32_t length2, full_length;
229
230         if (queue->alignment) {
231                 /* enforce the length and alignment rules from the tcp packet allocator */
232                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
233                 *(uint32_t *)data = length2;
234         } else {
235                 length2 = length;
236         }
237
238         if (length2 != length) {
239                 memset(data+length, 0, length2-length);
240         }
241
242         full_length = length2;
243         
244         /* if the queue is empty then try an immediate write, avoiding
245            queue overhead. This relies on non-blocking sockets */
246         if (queue->out_queue == NULL && queue->fd != -1 &&
247             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
248                 ssize_t n = write(queue->fd, data, length2);
249                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
250                         talloc_free(queue->fde);
251                         queue->fde = NULL;
252                         queue->fd = -1;
253                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
254                                         queue_dead, queue);
255                         /* yes, we report success, as the dead node is 
256                            handled via a separate event */
257                         return 0;
258                 }
259                 if (n > 0) {
260                         data += n;
261                         length2 -= n;
262                 }
263                 if (length2 == 0) return 0;
264         }
265
266         pkt = talloc(queue, struct ctdb_queue_pkt);
267         CTDB_NO_MEMORY(queue->ctdb, pkt);
268
269         pkt->data = (uint8_t *)talloc_memdup(pkt, data, length2);
270         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
271
272         pkt->length = length2;
273         pkt->full_length = full_length;
274
275         if (queue->out_queue == NULL && queue->fd != -1) {
276                 EVENT_FD_WRITEABLE(queue->fde);
277         }
278
279         DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
280
281         return 0;
282 }
283
284
285 /*
286   setup the fd used by the queue
287  */
288 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
289 {
290         queue->fd = fd;
291         talloc_free(queue->fde);
292         queue->fde = NULL;
293
294         if (fd != -1) {
295                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
296                                           queue_io_handler, queue);
297                 if (queue->fde == NULL) {
298                         return -1;
299                 }
300
301                 if (queue->out_queue) {
302                         EVENT_FD_WRITEABLE(queue->fde);         
303                 }
304         }
305
306         return 0;
307 }
308
309
310
311 /*
312   setup a packet queue on a socket
313  */
314 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
315                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
316                                     
317                                     ctdb_queue_cb_fn_t callback,
318                                     void *private_data)
319 {
320         struct ctdb_queue *queue;
321
322         queue = talloc_zero(mem_ctx, struct ctdb_queue);
323         CTDB_NO_MEMORY_NULL(ctdb, queue);
324
325         queue->ctdb = ctdb;
326         queue->fd = fd;
327         queue->alignment = alignment;
328         queue->private_data = private_data;
329         queue->callback = callback;
330         if (fd != -1) {
331                 if (ctdb_queue_set_fd(queue, fd) != 0) {
332                         talloc_free(queue);
333                         return NULL;
334                 }
335         }
336
337         return queue;
338 }