merge tcp changes from ronnie
[sahlberg/ctdb.git] / tcp / tcp_io.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28 #include "ctdb_tcp.h"
29
30
31 /*
32   called when we fail to send a message to a node
33 */
34 static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
35                                struct timeval t, void *private)
36 {
37         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
38         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
39                                                       struct ctdb_tcp_node);
40
41         /* start a new connect cycle to try to re-establish the
42            link */
43         talloc_free(tnode->fde);
44         close(tnode->fd);
45         tnode->fd = -1;
46         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
47                         ctdb_tcp_node_connect, node);
48 }
49
50 /*
51   called when socket becomes readable
52 */
53 void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
54                          uint16_t flags, void *private)
55 {
56         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
57         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
58                                                       struct ctdb_tcp_node);
59         if (flags & EVENT_FD_READ) {
60                 /* getting a read event on this fd in the current tcp model is
61                    always an error, as we have separate read and write
62                    sockets. In future we may combine them, but for now it must
63                    mean that the socket is dead, so we try to reconnect */
64                 node->ctdb->upcalls->node_dead(node);
65                 talloc_free(tnode->fde);
66                 close(tnode->fd);
67                 tnode->fd = -1;
68                 event_add_timed(node->ctdb->ev, node, timeval_zero(), 
69                                 ctdb_tcp_node_connect, node);
70                 return;
71         }
72
73         while (tnode->queue) {
74                 struct ctdb_tcp_packet *pkt = tnode->queue;
75                 ssize_t n;
76
77                 n = write(tnode->fd, pkt->data, pkt->length);
78
79                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
80                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
81                                         ctdb_tcp_node_dead, node);
82                         EVENT_FD_NOT_WRITEABLE(tnode->fde);
83                         return;
84                 }
85                 if (n <= 0) return;
86                 
87                 if (n != pkt->length) {
88                         pkt->length -= n;
89                         pkt->data += n;
90                         return;
91                 }
92
93                 DLIST_REMOVE(tnode->queue, pkt);
94                 talloc_free(pkt);
95         }
96
97         EVENT_FD_NOT_WRITEABLE(tnode->fde);
98 }
99
100
101
102 static void tcp_read_cb(uint8_t *data, int cnt, void *args)
103 {
104         struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
105         struct ctdb_req_header *hdr;
106
107         if (cnt < sizeof(*hdr)) {
108                 ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt);
109                 return;
110         }
111         hdr = (struct ctdb_req_header *)data;
112         if (cnt != hdr->length) {
113                 ctdb_set_error(in->ctdb, "Bad header length %d expected %d\n", 
114                                hdr->length, cnt);
115                 return;
116         }
117
118         if (hdr->ctdb_magic != CTDB_MAGIC) {
119                 ctdb_set_error(in->ctdb, "Non CTDB packet rejected\n");
120                 return;
121         }
122
123         if (hdr->ctdb_version != CTDB_VERSION) {
124                 ctdb_set_error(in->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
125                 return;
126         }
127
128         /* most common case - we got a whole packet in one go
129            tell the ctdb layer above that we have a packet */
130         in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt);
131 }
132
133 /*
134   called when an incoming connection is readable
135 */
136 void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
137                             uint16_t flags, void *private)
138 {
139         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
140
141         ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in);
142 }
143
144 /*
145   queue a packet for sending
146 */
147 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
148 {
149         struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
150                                                       struct ctdb_tcp_node);
151         struct ctdb_tcp_packet *pkt;
152         uint32_t length2;
153
154         /* enforce the length and alignment rules from the tcp packet allocator */
155         length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
156         *(uint32_t *)data = length2;
157
158         if (length2 != length) {
159                 memset(data+length, 0, length2-length);
160         }
161         
162         /* if the queue is empty then try an immediate write, avoiding
163            queue overhead. This relies on non-blocking sockets */
164         if (tnode->queue == NULL && tnode->fd != -1) {
165                 ssize_t n = write(tnode->fd, data, length2);
166                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
167                         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
168                                         ctdb_tcp_node_dead, node);
169                         /* yes, we report success, as the dead node is 
170                            handled via a separate event */
171                         return 0;
172                 }
173                 if (n > 0) {
174                         data += n;
175                         length2 -= n;
176                 }
177                 if (length2 == 0) return 0;
178         }
179
180         pkt = talloc(tnode, struct ctdb_tcp_packet);
181         CTDB_NO_MEMORY(node->ctdb, pkt);
182
183         pkt->data = talloc_memdup(pkt, data, length2);
184         CTDB_NO_MEMORY(node->ctdb, pkt->data);
185
186         pkt->length = length2;
187
188         if (tnode->queue == NULL && tnode->fd != -1) {
189                 EVENT_FD_WRITEABLE(tnode->fde);
190         }
191
192         DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
193
194         return 0;
195 }