merged from samba4 ctdb
[vlendec/samba-autobuild/.git] / ctdb / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28 #include "ctdb_tcp.h"
29
30
31 /*
32   called when we fail to send a message to a node
33 */
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
35                                struct timeval t, void *private)
36 {
37         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
39                                                       struct ctdb_tcp_node);
40
41         /* start a new connect cycle to try to re-establish the
42            link */
43         talloc_free(tnode->fde);
44         close(tnode->fd);
45         tnode->fd = -1;
46         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
47                         ctdb_tcp_node_connect, node);
48 }
49
50 /*
51   called when socket becomes readable
52 */
53 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
54                          uint16_t flags, void *private)
55 {
56         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
57         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
58                                                       struct ctdb_tcp_node);
59         if (flags & EVENT_FD_READ) {
60                 /* getting a read event on this fd in the current tcp model is
61                    always an error, as we have separate read and write
62                    sockets. In future we may combine them, but for now it must
63                    mean that the socket is dead, so we try to reconnect */
64                 node->ctdb->upcalls->node_dead(node);
65                 talloc_free(tnode->fde);
66                 close(tnode->fd);
67                 tnode->fd = -1;
68                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
69                                 ctdb_tcp_node_connect, node);
70                 return;
71         }
72
73         while (tnode->queue) {
74                 struct ctdb_tcp_packet *pkt = tnode->queue;
75                 ssize_t n;
76
77                 n = write(tnode->fd, pkt->data, pkt->length);
78
79                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
80                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
81                                         ctdb_tcp_node_dead, node);
82                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
83                         return;
84                 }
85                 if (n <= 0) return;
86                 
87                 if (n != pkt->length) {
88                         pkt->length -= n;
89                         pkt->data += n;
90                         return;
91                 }
92
93                 DLIST_REMOVE(tnode->queue, pkt);
94                 talloc_free(pkt);
95         }
96
97         EVENT_FD_NOT_WRITEABLE(tnode->fde);
98 }
99
100
101 /*
102   called when an incoming connection is readable
103 */
104 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
105                             uint16_t flags, void *private)
106 {
107         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
108         int num_ready = 0;
109         ssize_t nread;
110         uint8_t *data, *data_base;
111
112         if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
113             num_ready == 0) {
114                 /* we've lost the link from another node. We don't
115                    notify the upper layers, as we only want to trigger
116                    a full node reorganisation when a send fails - that
117                    allows nodes to restart without penalty as long as
118                    the network is idle */
119                 talloc_free(in);
120                 return;
121         }
122
123         in->partial.data = talloc_realloc_size(in, in->partial.data, 
124                                                num_ready + in->partial.length);
125         if (in->partial.data == NULL) {
126                 /* not much we can do except drop the socket */
127                 talloc_free(in);
128                 return;
129         }
130
131         nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
132         if (nread <= 0) {
133                 /* the connection must be dead */
134                 talloc_free(in);
135                 return;
136         }
137
138         data = in->partial.data;
139         nread += in->partial.length;
140
141         in->partial.data = NULL;
142         in->partial.length = 0;
143
144         if (nread >= 4 && *(uint32_t *)data == nread) {
145                 /* most common case - we got a whole packet in one go
146                    tell the ctdb layer above that we have a packet */
147                 in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
148                 return;
149         }
150
151         data_base = data;
152
153         while (nread >= 4 && *(uint32_t *)data <= nread) {
154                 /* we have at least one packet */
155                 uint8_t *d2;
156                 uint32_t len;
157                 len = *(uint32_t *)data;
158                 d2 = talloc_memdup(in, data, len);
159                 if (d2 == NULL) {
160                         /* sigh */
161                         talloc_free(in);
162                         return;
163                 }
164                 in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
165                 data += len;
166                 nread -= len;           
167         }
168
169         if (nread > 0) {
170                 /* we have only part of a packet */
171                 if (data_base == data) {
172                         in->partial.data = data;
173                         in->partial.length = nread;
174                 } else {
175                         in->partial.data = talloc_memdup(in, data, nread);
176                         if (in->partial.data == NULL) {
177                                 talloc_free(in);
178                                 return;
179                         }
180                         in->partial.length = nread;
181                         talloc_free(data_base);
182                 }
183                 return;
184         }
185
186         talloc_free(data_base);
187 }
188
189 /*
190   queue a packet for sending
191 */
192 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
193 {
194         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
195                                                       struct ctdb_tcp_node);
196         struct ctdb_tcp_packet *pkt;
197         uint32_t length2;
198
199         /* enforce the length and alignment rules from the tcp packet allocator */
200         length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
201         *(uint32_t *)data = length2;
202
203         if (length2 != length) {
204                 memset(data+length, 0, length2-length);
205         }
206         
207         /* if the queue is empty then try an immediate write, avoiding
208            queue overhead. This relies on non-blocking sockets */
209         if (tnode->queue == NULL && tnode->fd != -1) {
210                 ssize_t n = write(tnode->fd, data, length2);
211                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
212                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
213                                         ctdb_tcp_node_dead, node);
214                         /* yes, we report success, as the dead node is 
215                            handled via a separate event */
216                         return 0;
217                 }
218                 if (n > 0) {
219                         data += n;
220                         length2 -= n;
221                 }
222                 if (length2 == 0) return 0;
223         }
224
225         pkt = talloc(tnode, struct ctdb_tcp_packet);
226         CTDB_NO_MEMORY(node->ctdb, pkt);
227
228         pkt->data = talloc_memdup(pkt, data, length2);
229         CTDB_NO_MEMORY(node->ctdb, pkt->data);
230
231         pkt->length = length2;
232
233         if (tnode->queue == NULL && tnode->fd != -1) {
234                 EVENT_FD_WRITEABLE(tnode->fde);
235         }
236
237         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
238
239         return 0;
240 }