Merged tridge's branch.
[vlendec/samba-autobuild/.git] / ctdb / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28 #include "ctdb_tcp.h"
29
30
31 /*
32   called when we fail to send a message to a node
33 */
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
35                                struct timeval t, void *private)
36 {
37         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
39                                                       struct ctdb_tcp_node);
40
41         /* flush the queue */
42         while (tnode->queue) {
43                 struct ctdb_tcp_packet *pkt = tnode->queue;
44                 DLIST_REMOVE(tnode->queue, pkt);
45                 talloc_free(pkt);
46         }
47
48         /* start a new connect cycle to try to re-establish the
49            link */
50         talloc_free(tnode->fde);
51         close(tnode->fd);
52         tnode->fd = -1;
53         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
54                         ctdb_tcp_node_connect, node);
55 }
56
57 /*
58   called when socket becomes readable
59 */
60 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
61                          uint16_t flags, void *private)
62 {
63         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
64         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
65                                                       struct ctdb_tcp_node);
66         if (flags & EVENT_FD_READ) {
67                 /* getting a read event on this fd in the current tcp model is
68                    always an error, as we have separate read and write
69                    sockets. In future we may combine them, but for now it must
70                    mean that the socket is dead, so we try to reconnect */
71                 talloc_free(tnode->fde);
72                 close(tnode->fd);
73                 tnode->fd = -1;
74                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
75                                 ctdb_tcp_node_connect, node);
76                 return;
77         }
78
79         while (tnode->queue) {
80                 struct ctdb_tcp_packet *pkt = tnode->queue;
81                 ssize_t n;
82
83                 n = write(tnode->fd, pkt->data, pkt->length);
84
85                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
86                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
87                                         ctdb_tcp_node_dead, node);
88                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
89                         return;
90                 }
91                 if (n <= 0) return;
92                 
93                 if (n != pkt->length) {
94                         pkt->length -= n;
95                         pkt->data += n;
96                         return;
97                 }
98
99                 DLIST_REMOVE(tnode->queue, pkt);
100                 talloc_free(pkt);
101         }
102
103         EVENT_FD_NOT_WRITEABLE(tnode->fde);
104 }
105
106
107 /*
108   called when an incoming connection is readable
109 */
110 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
111                             uint16_t flags, void *private)
112 {
113         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
114         int num_ready = 0;
115         ssize_t nread;
116         uint8_t *data, *data_base;
117
118         if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
119             num_ready == 0) {
120                 /* we've lost the link from another node. We don't
121                    notify the upper layers, as we only want to trigger
122                    a full node reorganisation when a send fails - that
123                    allows nodes to restart without penalty as long as
124                    the network is idle */
125                 talloc_free(in);
126                 return;
127         }
128
129         in->partial.data = talloc_realloc_size(in, in->partial.data, 
130                                                num_ready + in->partial.length);
131         if (in->partial.data == NULL) {
132                 /* not much we can do except drop the socket */
133                 talloc_free(in);
134                 return;
135         }
136
137         nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
138         if (nread <= 0) {
139                 /* the connection must be dead */
140                 talloc_free(in);
141                 return;
142         }
143
144         data = in->partial.data;
145         nread += in->partial.length;
146
147         in->partial.data = NULL;
148         in->partial.length = 0;
149
150         if (nread >= 4 && *(uint32_t *)data == nread) {
151                 /* most common case - we got a whole packet in one go
152                    tell the ctdb layer above that we have a packet */
153                 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
154                 return;
155         }
156
157         data_base = data;
158
159         while (nread >= 4 && *(uint32_t *)data <= nread) {
160                 /* we have at least one packet */
161                 uint8_t *d2;
162                 uint32_t len;
163                 len = *(uint32_t *)data;
164                 d2 = talloc_memdup(in, data, len);
165                 if (d2 == NULL) {
166                         /* sigh */
167                         talloc_free(in);
168                         return;
169                 }
170                 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
171                 data += len;
172                 nread -= len;           
173                 return;
174         }
175
176         if (nread < 4 || *(uint32_t *)data > nread) {
177                 /* we have only part of a packet */
178                 if (data_base == data) {
179                         in->partial.data = data;
180                         in->partial.length = nread;
181                 } else {
182                         in->partial.data = talloc_memdup(in, data, nread);
183                         if (in->partial.data == NULL) {
184                                 talloc_free(in);
185                                 return;
186                         }
187                         in->partial.length = nread;
188                         talloc_free(data_base);
189                 }
190                 return;
191         }
192
193         talloc_free(data_base);
194 }
195
196 /*
197   queue a packet for sending
198 */
199 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
200 {
201         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
202                                                       struct ctdb_tcp_node);
203         struct ctdb_tcp_packet *pkt;
204         uint32_t length2;
205
206         /* enforce the length and alignment rules from the tcp packet allocator */
207         length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
208         *(uint32_t *)data = length2;
209
210         if (length2 != length) {
211                 memset(data+length, 0, length2-length);
212         }
213         
214         /* if the queue is empty then try an immediate write, avoiding
215            queue overhead. This relies on non-blocking sockets */
216         if (tnode->queue == NULL && tnode->fd != -1) {
217                 ssize_t n = write(tnode->fd, data, length2);
218                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
219                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
220                                         ctdb_tcp_node_dead, node);
221                         /* yes, we report success, as the dead node is 
222                            handled via a separate event */
223                         return 0;
224                 }
225                 if (n > 0) {
226                         data += n;
227                         length2 -= n;
228                 }
229                 if (length2 == 0) return 0;
230         }
231
232         pkt = talloc(tnode, struct ctdb_tcp_packet);
233         CTDB_NO_MEMORY(node->ctdb, pkt);
234
235         pkt->data = talloc_memdup(pkt, data, length2);
236         CTDB_NO_MEMORY(node->ctdb, pkt->data);
237
238         pkt->length = length2;
239
240         if (tnode->queue == NULL && tnode->fd != -1) {
241                 EVENT_FD_WRITEABLE(tnode->fde);
242         }
243
244         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
245
246         return 0;
247 }