merge from ab
[vlendec/samba-autobuild/.git] / ctdb / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
26 #include "ctdb_tcp.h"
27
28
29 /*
30   called when we fail to send a message to a node
31 */
32 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
33                                struct timeval t, void *private)
34 {
35         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
36         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
37                                                       struct ctdb_tcp_node);
38
39         /* flush the queue */
40         while (tnode->queue) {
41                 struct ctdb_tcp_packet *pkt = tnode->queue;
42                 DLIST_REMOVE(tnode->queue, pkt);
43                 talloc_free(pkt);
44         }
45
46         /* start a new connect cycle to try to re-establish the
47            link */
48         talloc_free(tnode->fde);
49         close(tnode->fd);
50         tnode->fd = -1;
51         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
52                         ctdb_tcp_node_connect, node);
53 }
54
55 /*
56   called when socket becomes readable
57 */
58 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
59                          uint16_t flags, void *private)
60 {
61         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
62         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
63                                                       struct ctdb_tcp_node);
64         if (flags & EVENT_FD_READ) {
65                 /* getting a read event on this fd in the current tcp model is
66                    always an error, as we have separate read and write
67                    sockets. In future we may combine them, but for now it must
68                    mean that the socket is dead, so we try to reconnect */
69                 talloc_free(tnode->fde);
70                 close(tnode->fd);
71                 tnode->fd = -1;
72                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
73                                 ctdb_tcp_node_connect, node);
74                 return;
75         }
76
77         while (tnode->queue) {
78                 struct ctdb_tcp_packet *pkt = tnode->queue;
79                 ssize_t n;
80
81                 n = write(tnode->fd, pkt->data, pkt->length);
82
83                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
84                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
85                                         ctdb_tcp_node_dead, node);
86                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
87                         return;
88                 }
89                 if (n <= 0) return;
90                 
91                 if (n != pkt->length) {
92                         pkt->length -= n;
93                         pkt->data += n;
94                         return;
95                 }
96
97                 DLIST_REMOVE(tnode->queue, pkt);
98                 talloc_free(pkt);
99         }
100
101         EVENT_FD_NOT_WRITEABLE(tnode->fde);
102 }
103
104
105 /*
106   called when an incoming connection is readable
107 */
108 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
109                             uint16_t flags, void *private)
110 {
111         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
112         int num_ready = 0;
113         ssize_t nread;
114         uint8_t *data, *data_base;
115
116         if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
117             num_ready == 0) {
118                 /* we've lost the link from another node. We don't
119                    notify the upper layers, as we only want to trigger
120                    a full node reorganisation when a send fails - that
121                    allows nodes to restart without penalty as long as
122                    the network is idle */
123                 talloc_free(in);
124                 return;
125         }
126
127         in->partial.data = talloc_realloc_size(in, in->partial.data, 
128                                                num_ready + in->partial.length);
129         if (in->partial.data == NULL) {
130                 /* not much we can do except drop the socket */
131                 talloc_free(in);
132                 return;
133         }
134
135         nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
136         if (nread <= 0) {
137                 /* the connection must be dead */
138                 talloc_free(in);
139                 return;
140         }
141
142         data = in->partial.data;
143         nread += in->partial.length;
144
145         in->partial.data = NULL;
146         in->partial.length = 0;
147
148         if (nread >= 4 && *(uint32_t *)data == nread) {
149                 /* most common case - we got a whole packet in one go
150                    tell the ctdb layer above that we have a packet */
151                 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
152                 return;
153         }
154
155         data_base = data;
156
157         while (nread >= 4 && *(uint32_t *)data <= nread) {
158                 /* we have at least one packet */
159                 uint8_t *d2;
160                 uint32_t len;
161                 len = *(uint32_t *)data;
162                 d2 = talloc_memdup(in, data, len);
163                 if (d2 == NULL) {
164                         /* sigh */
165                         talloc_free(in);
166                         return;
167                 }
168                 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
169                 data += len;
170                 nread -= len;           
171                 return;
172         }
173
174         if (nread < 4 || *(uint32_t *)data > nread) {
175                 /* we have only part of a packet */
176                 if (data_base == data) {
177                         in->partial.data = data;
178                         in->partial.length = nread;
179                 } else {
180                         in->partial.data = talloc_memdup(in, data, nread);
181                         if (in->partial.data == NULL) {
182                                 talloc_free(in);
183                                 return;
184                         }
185                         in->partial.length = nread;
186                         talloc_free(data_base);
187                 }
188                 return;
189         }
190
191         talloc_free(data_base);
192 }
193
194 /*
195   queue a packet for sending
196 */
197 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
198 {
199         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
200                                                       struct ctdb_tcp_node);
201         struct ctdb_tcp_packet *pkt;
202
203         /* enforce the length and alignment rules from the tcp packet allocator */
204         length = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
205         *(uint32_t *)data = length;
206         
207         /* if the queue is empty then try an immediate write, avoiding
208            queue overhead. This relies on non-blocking sockets */
209         if (tnode->queue == NULL && tnode->fd != -1) {
210                 ssize_t n = write(tnode->fd, data, length);
211                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
212                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
213                                         ctdb_tcp_node_dead, node);
214                         /* yes, we report success, as the dead node is 
215                            handled via a separate event */
216                         return 0;
217                 }
218                 if (n > 0) {
219                         data += n;
220                         length -= n;
221                 }
222                 if (length == 0) return 0;
223         }
224
225         pkt = talloc(tnode, struct ctdb_tcp_packet);
226         CTDB_NO_MEMORY(node->ctdb, pkt);
227
228         pkt->data = talloc_memdup(pkt, data, length);
229         CTDB_NO_MEMORY(node->ctdb, pkt->data);
230
231         pkt->length = length;
232
233         if (tnode->queue == NULL && tnode->fd != -1) {
234                 EVENT_FD_WRITEABLE(tnode->fde);
235         }
236
237         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
238
239         return 0;
240 }