r23795: more v2->v3 conversion
[samba.git] / source4 / cluster / ctdb / tcp / tcp_connect.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 3 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27 #include "ctdb_tcp.h"
28
29 static void set_nonblocking(int fd)
30 {
31         unsigned v;
32         v = fcntl(fd, F_GETFL, 0);
33         fcntl(fd, F_SETFL, v | O_NONBLOCK);
34 }
35
36
37 /*
38   called when a complete packet has come in - should not happen on this socket
39  */
40 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
41 {
42         struct ctdb_node *node = talloc_get_type(private_data, struct ctdb_node);
43         struct ctdb_tcp_node *tnode = talloc_get_type(
44                 node->private_data, struct ctdb_tcp_node);
45
46         if (data == NULL) {
47                 node->ctdb->upcalls->node_dead(node);
48         }
49
50         /* start a new connect cycle to try to re-establish the
51            link */
52         close(tnode->fd);
53         ctdb_queue_set_fd(tnode->queue, -1);
54         tnode->fd = -1;
55         event_add_timed(node->ctdb->ev, node, timeval_zero(), 
56                         ctdb_tcp_node_connect, node);
57 }
58
59 /*
60   called when socket becomes writeable on connect
61 */
62 static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, 
63                                     uint16_t flags, void *private_data)
64 {
65         struct ctdb_node *node = talloc_get_type(private_data,
66                                                  struct ctdb_node);
67         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data,
68                                                       struct ctdb_tcp_node);
69         struct ctdb_context *ctdb = node->ctdb;
70         int error = 0;
71         socklen_t len = sizeof(error);
72         int one = 1;
73
74         if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
75             error != 0) {
76                 talloc_free(fde);
77                 close(tnode->fd);
78                 tnode->fd = -1;
79                 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
80                                 ctdb_tcp_node_connect, node);
81                 return;
82         }
83
84         talloc_free(fde);
85         
86         setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
87
88         ctdb_queue_set_fd(tnode->queue, tnode->fd);
89
90         /* tell the ctdb layer we are connected */
91         node->ctdb->upcalls->node_connected(node);
92 }
93
94
95 static int ctdb_tcp_get_address(struct ctdb_context *ctdb,
96                                 const char *address, struct in_addr *addr)
97 {
98         if (inet_pton(AF_INET, address, addr) <= 0) {
99                 struct hostent *he = gethostbyname(address);
100                 if (he == NULL || he->h_length > sizeof(*addr)) {
101                         ctdb_set_error(ctdb, "invalid nework address '%s'\n", 
102                                        address);
103                         return -1;
104                 }
105                 memcpy(addr, he->h_addr, he->h_length);
106         }
107         return 0;
108 }
109
110 /*
111   called when we should try and establish a tcp connection to a node
112 */
113 void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
114                            struct timeval t, void *private_data)
115 {
116         struct ctdb_node *node = talloc_get_type(private_data,
117                                                  struct ctdb_node);
118         struct ctdb_tcp_node *tnode = talloc_get_type(node->private_data, 
119                                                       struct ctdb_tcp_node);
120         struct ctdb_context *ctdb = node->ctdb;
121         struct sockaddr_in sock_in;
122         struct sockaddr_in sock_out;
123
124         tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
125
126         set_nonblocking(tnode->fd);
127
128         if (ctdb_tcp_get_address(ctdb, node->address.address, &sock_out.sin_addr) != 0) {
129                 return;
130         }
131         sock_out.sin_port = htons(node->address.port);
132         sock_out.sin_family = PF_INET;
133
134
135         /* Bind our side of the socketpair to the same address we use to listen
136          * on incoming CTDB traffic.
137          * We must specify this address to make sure that the address we expose to
138          * the remote side is actually routable in case CTDB traffic will run on
139          * a dedicated non-routeable network.
140          */
141         if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock_in.sin_addr) != 0) {
142                 return;
143         }
144         sock_in.sin_port = htons(0); /* INPORT_ANY is not always available */
145         sock_in.sin_family = PF_INET;
146         bind(tnode->fd, (struct sockaddr *)&sock_in, sizeof(sock_in));
147
148         if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
149             errno != EINPROGRESS) {
150                 /* try again once a second */
151                 close(tnode->fd);
152                 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
153                                 ctdb_tcp_node_connect, node);
154                 return;
155         }
156
157         /* non-blocking connect - wait for write event */
158         event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ, 
159                      ctdb_node_connect_write, node);
160 }
161
162 /*
163   destroy a ctdb_incoming structure 
164 */
165 static int ctdb_incoming_destructor(struct ctdb_incoming *in)
166 {
167         close(in->fd);
168         in->fd = -1;
169         return 0;
170 }
171
172 /*
173   called when we get contacted by another node
174   currently makes no attempt to check if the connection is really from a ctdb
175   node in our cluster
176 */
177 static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, 
178                               uint16_t flags, void *private_data)
179 {
180         struct ctdb_context *ctdb;
181         struct ctdb_tcp *ctcp;
182         struct sockaddr_in addr;
183         socklen_t len;
184         int fd;
185         struct ctdb_incoming *in;
186
187         ctdb = talloc_get_type(private_data, struct ctdb_context);
188         ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
189         memset(&addr, 0, sizeof(addr));
190         len = sizeof(addr);
191         fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
192         if (fd == -1) return;
193
194         in = talloc_zero(ctdb, struct ctdb_incoming);
195         in->fd = fd;
196         in->ctdb = ctdb;
197
198         set_nonblocking(in->fd);
199
200         in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, 
201                                      ctdb_tcp_read_cb, in);
202
203         talloc_set_destructor(in, ctdb_incoming_destructor);
204 }
205
206
207 /*
208   listen on our own address
209 */
210 int ctdb_tcp_listen(struct ctdb_context *ctdb)
211 {
212         struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
213                                                 struct ctdb_tcp);
214         struct sockaddr_in sock;
215         int one = 1;
216
217         sock.sin_port = htons(ctdb->address.port);
218         sock.sin_family = PF_INET;
219         if (ctdb_tcp_get_address(ctdb, ctdb->address.address, &sock.sin_addr) != 0) {
220                 return -1;
221         }
222
223         ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
224         if (ctcp->listen_fd == -1) {
225                 ctdb_set_error(ctdb, "socket failed\n");
226                 return -1;
227         }
228
229         setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
230
231         if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
232                 ctdb_set_error(ctdb, "bind failed\n");
233                 close(ctcp->listen_fd);
234                 ctcp->listen_fd = -1;
235                 return -1;
236         }
237
238         if (listen(ctcp->listen_fd, 10) == -1) {
239                 ctdb_set_error(ctdb, "listen failed\n");
240                 close(ctcp->listen_fd);
241                 ctcp->listen_fd = -1;
242                 return -1;
243         }
244
245         event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ, 
246                      ctdb_listen_event, ctdb);  
247
248         return 0;
249 }
250