don't talk to ourselves
[vlendec/samba-autobuild/.git] / ctdb / ctdb_tcp.c
1 /* 
2    ctdb over TCP
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
26
27 /*
28   initialise the ctdb daemon. 
29
30   if the ctdb dispatcher daemon has already been started then this
31   does nothing. Otherwise it forks the ctdb dispatcher daemon and
32   starts the daemons connecting to each other
33   
34   NOTE: In current code the daemon does not fork. This is for testing purposes only
35   and to simplify the code.
36 */
37
38 struct ctdb_context *ctdb_init(struct event_context *ev)
39 {
40         struct ctdb_context *ctdb;
41
42         ctdb = talloc_zero(ev, struct ctdb_context);
43         ctdb->ev = ev;
44
45         return ctdb;
46 }
47
48 const char *ctdb_errstr(struct ctdb_context *ctdb)
49 {
50         return ctdb->err_msg;
51 }
52
53 /*
54   remember an error message
55 */
56 static void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
57 {
58         va_list ap;
59         talloc_free(ctdb->err_msg);
60         va_start(ap, fmt);
61         ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
62         va_end(ap);
63 }
64
65 /*
66   called when socket becomes readable
67 */
68 static void ctdb_node_read(struct event_context *ev, struct fd_event *fde, 
69                            uint16_t flags, void *private)
70 {
71         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
72         printf("connection to node %s:%u is readable\n", 
73                node->address.address, node->address.port);
74         event_set_fd_flags(fde, 0);
75 }
76
77 static void ctdb_node_connect(struct event_context *ev, struct timed_event *te, 
78                               struct timeval t, void *private);
79
80 /*
81   called when socket becomes writeable on connect
82 */
83 static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, 
84                                     uint16_t flags, void *private)
85 {
86         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
87         struct ctdb_context *ctdb = node->ctdb;
88         int error;
89         socklen_t len;
90
91         if (getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
92             error != 0) {
93                 if (error == EINPROGRESS) {
94                         printf("connect in progress\n");
95                         return;
96                 }
97                 printf("getsockopt errno=%s\n", strerror(errno));
98                 talloc_free(fde);
99                 close(node->fd);
100                 node->fd = -1;
101                 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
102                                 ctdb_node_connect, node);
103                 return;
104         }
105
106         printf("Established connection to %s:%u\n", 
107                node->address.address, node->address.port);
108         talloc_free(fde);
109         event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_READ, 
110                      ctdb_node_read, node);
111 }
112
113 /*
114   called when we should try and establish a tcp connection to a node
115 */
116 static void ctdb_node_connect(struct event_context *ev, struct timed_event *te, 
117                               struct timeval t, void *private)
118 {
119         struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
120         struct ctdb_context *ctdb = node->ctdb;
121         unsigned v;
122         struct sockaddr_in sock_out;
123
124         node->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
125
126         v = fcntl(node->fd, F_GETFL, 0);
127         fcntl(node->fd, F_SETFL, v | O_NONBLOCK);
128
129         inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
130         sock_out.sin_port = htons(node->address.port);
131         sock_out.sin_family = PF_INET;
132         
133         if (connect(node->fd, &sock_out, sizeof(sock_out)) != 0 &&
134             errno != EINPROGRESS) {
135                 /* try again once a second */
136                 close(node->fd);
137                 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
138                                 ctdb_node_connect, node);
139                 return;
140         }
141
142         /* non-blocking connect - wait for write event */
143         event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_WRITE, 
144                      ctdb_node_connect_write, node);
145 }
146
147 /*
148   parse a IP:port pair
149 */
150 static int ctdb_parse_address(struct ctdb_context *ctdb,
151                               TALLOC_CTX *mem_ctx, const char *str,
152                               struct ctdb_address *address)
153 {
154         char *p;
155         p = strchr(str, ':');
156         if (p == NULL) {
157                 ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
158                 return -1;
159         }
160
161         address->address = talloc_strndup(mem_ctx, str, p-str);
162         address->port = strtoul(p+1, NULL, 0);
163         return 0;
164 }
165
166
167 /*
168   add a node to the list of active nodes
169 */
170 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
171 {
172         struct ctdb_node *node;
173
174         node = talloc(ctdb, struct ctdb_node);
175         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
176                 return -1;
177         }
178         node->fd = -1;
179         node->ctdb = ctdb;
180
181         DLIST_ADD(ctdb->nodes, node);   
182         return 0;
183 }
184
185 /*
186   setup the node list from a file
187 */
188 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
189 {
190         char **lines;
191         int nlines;
192         int i;
193
194         lines = file_lines_load(nlist, &nlines, ctdb);
195         if (lines == NULL) {
196                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
197                 return -1;
198         }
199
200         for (i=0;i<nlines;i++) {
201                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
202                         talloc_free(lines);
203                         return -1;
204                 }
205         }
206         
207         talloc_free(lines);
208         return 0;
209 }
210
211 /*
212   setup the node list from a file
213 */
214 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
215 {
216         return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
217 }
218
219 /*
220   add a node to the list of active nodes
221 */
222 int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
223 {
224         struct ctdb_registered_call *call;
225
226         call = talloc(ctdb, struct ctdb_registered_call);
227         call->fn = fn;
228         call->id = id;
229
230         DLIST_ADD(ctdb->calls, call);   
231         return 0;
232 }
233
234 /*
235   attach to a specific database
236 */
237 int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
238                 int open_flags, mode_t mode)
239 {
240         ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
241         if (ctdb->ltdb == NULL) {
242                 ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
243                 return -1;
244         }
245         return 0;
246 }
247
248 /*
249   called when an incoming connection is readable
250 */
251 static void ctdb_incoming_read(struct event_context *ev, struct fd_event *fde, 
252                                uint16_t flags, void *private)
253 {
254         struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
255         char c;
256         printf("Incoming data\n");
257         if (read(in->fd, &c, 1) <= 0) {
258                 /* socket is dead */
259                 close(in->fd);
260                 talloc_free(in);
261         }
262 }
263
264
265 /*
266   called when we get contacted by another node
267   currently makes no attempt to check if the connection is really from a ctdb
268   node in our cluster
269 */
270 static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, 
271                               uint16_t flags, void *private)
272 {
273         struct ctdb_context *ctdb;
274         struct sockaddr_in addr;
275         socklen_t len;
276         int fd;
277         struct ctdb_incoming *in;
278
279         ctdb = talloc_get_type(private, struct ctdb_context);
280         memset(&addr, 0, sizeof(addr));
281         len = sizeof(addr);
282         fd = accept(ctdb->listen_fd, (struct sockaddr *)&addr, &len);
283         if (fd == -1) return;
284
285         in = talloc(ctdb, struct ctdb_incoming);
286         in->fd = fd;
287         in->ctdb = ctdb;
288
289         event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, 
290                      ctdb_incoming_read, in);   
291
292         printf("New incoming socket %d\n", in->fd);
293 }
294
295
296 /*
297   listen on our own address
298 */
299 static int ctdb_listen(struct ctdb_context *ctdb)
300 {
301         struct sockaddr_in sock;
302         int one = 1;
303
304         sock.sin_port = htons(ctdb->address.port);
305         sock.sin_family = PF_INET;
306         inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
307
308         ctdb->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
309         if (ctdb->listen_fd == -1) {
310                 ctdb_set_error(ctdb, "socket failed\n");
311                 return -1;
312         }
313
314         setsockopt(ctdb->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
315
316         if (bind(ctdb->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
317                 ctdb_set_error(ctdb, "bind failed\n");
318                 close(ctdb->listen_fd);
319                 ctdb->listen_fd = -1;
320                 return -1;
321         }
322
323         if (listen(ctdb->listen_fd, 10) == -1) {
324                 ctdb_set_error(ctdb, "listen failed\n");
325                 close(ctdb->listen_fd);
326                 ctdb->listen_fd = -1;
327                 return -1;
328         }
329
330         event_add_fd(ctdb->ev, ctdb, ctdb->listen_fd, EVENT_FD_READ, 
331                      ctdb_listen_event, ctdb);  
332
333         return 0;
334 }
335
336 /*
337   check if two addresses are the same
338 */
339 static bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
340 {
341         return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
342 }
343
344 /*
345   start the protocol going
346 */
347 int ctdb_start(struct ctdb_context *ctdb)
348 {
349         struct ctdb_node *node;
350
351         /* listen on our own address */
352         if (ctdb_listen(ctdb) != 0) return -1;
353
354         /* startup connections to the other servers - will happen on
355            next event loop */
356         for (node=ctdb->nodes;node;node=node->next) {
357                 if (ctdb_same_address(&ctdb->address, &node->address)) continue;
358                 event_add_timed(ctdb->ev, node, timeval_zero(), 
359                                 ctdb_node_connect, node);
360         }
361
362         return 0;
363 }
364
365 /*
366   make a remote ctdb call
367 */
368 int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id, 
369               TDB_DATA *call_data, TDB_DATA *reply_data)
370 {
371         printf("ctdb_call not implemented\n");
372         return -1;
373 }