4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
28 initialise the ctdb daemon.
30 if the ctdb dispatcher daemon has already been started then this
31 does nothing. Otherwise it forks the ctdb dispatcher daemon and
32 starts the daemons connecting to each other
34 NOTE: In current code the daemon does not fork. This is for testing purposes only
35 and to simplify the code.
38 struct ctdb_context *ctdb_init(struct event_context *ev)
40 struct ctdb_context *ctdb;
42 ctdb = talloc_zero(ev, struct ctdb_context);
48 const char *ctdb_errstr(struct ctdb_context *ctdb)
54 remember an error message
56 static void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
59 talloc_free(ctdb->err_msg);
61 ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
66 called when socket becomes readable
68 static void ctdb_node_read(struct event_context *ev, struct fd_event *fde,
69 uint16_t flags, void *private)
71 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
72 printf("connection to node %s:%u is readable\n",
73 node->address.address, node->address.port);
74 event_set_fd_flags(fde, 0);
77 static void ctdb_node_connect(struct event_context *ev, struct timed_event *te,
78 struct timeval t, void *private);
81 called when socket becomes writeable on connect
83 static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,
84 uint16_t flags, void *private)
86 struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
87 struct ctdb_context *ctdb = node->ctdb;
91 if (getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
94 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
95 ctdb_node_connect, node);
99 printf("Established connection to %s:%u\n",
100 node->address.address, node->address.port);
102 event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_READ,
103 ctdb_node_read, node);
107 called when we should try and establish a tcp connection to a node
109 static void ctdb_node_connect(struct event_context *ev, struct timed_event *te,
110 struct timeval t, void *private)
112 struct ctdb_node *node = talloc_get_type(node, struct ctdb_node);
113 struct ctdb_context *ctdb = node->ctdb;
115 struct sockaddr_in sock_out;
117 node->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
119 v = fcntl(node->fd, F_GETFL, 0);
120 fcntl(node->fd, F_SETFL, v | O_NONBLOCK);
122 inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
123 sock_out.sin_port = htons(node->address.port);
124 sock_out.sin_family = PF_INET;
126 if (connect(node->fd, &sock_out, sizeof(sock_out)) != 0 &&
127 errno != EINPROGRESS) {
128 /* try again once a second */
130 event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
131 ctdb_node_connect, node);
135 /* non-blocking connect - wait for write event */
136 event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_WRITE,
137 ctdb_node_connect_write, node);
143 static int ctdb_parse_address(struct ctdb_context *ctdb,
144 TALLOC_CTX *mem_ctx, const char *str,
145 struct ctdb_address *address)
148 p = strchr(str, ':');
150 ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
154 address->address = talloc_strndup(mem_ctx, str, p-str);
155 address->port = strtoul(p+1, NULL, 0);
161 add a node to the list of active nodes
163 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
165 struct ctdb_node *node;
167 node = talloc(ctdb, struct ctdb_node);
168 if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
174 DLIST_ADD(ctdb->nodes, node);
179 setup the node list from a file
181 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
187 lines = file_lines_load(nlist, &nlines, ctdb);
189 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
193 for (i=0;i<nlines;i++) {
194 if (ctdb_add_node(ctdb, lines[i]) != 0) {
205 setup the node list from a file
207 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
209 return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
213 add a node to the list of active nodes
215 int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
217 struct ctdb_registered_call *call;
219 call = talloc(ctdb, struct ctdb_registered_call);
223 DLIST_ADD(ctdb->calls, call);
228 attach to a specific database
230 int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
231 int open_flags, mode_t mode)
233 ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
234 if (ctdb->ltdb == NULL) {
235 ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
242 called when an incoming connection is readable
244 static void ctdb_incoming_read(struct event_context *ev, struct fd_event *fde,
245 uint16_t flags, void *private)
247 struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
248 printf("Incoming data\n");
253 called when we get contacted by another node
254 currently makes no attempt to check if the connection is really from a ctdb
257 static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
258 uint16_t flags, void *private)
260 struct ctdb_context *ctdb;
261 struct sockaddr addr;
264 struct ctdb_incoming *in;
266 ctdb = talloc_get_type(private, struct ctdb_context);
267 fd = accept(ctdb->listen_fd, &addr, &len);
268 if (fd == -1) return;
270 in = talloc(ctdb, struct ctdb_incoming);
274 event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
275 ctdb_incoming_read, in);
280 listen on our own address
282 static int ctdb_listen(struct ctdb_context *ctdb)
284 struct sockaddr_in sock;
287 sock.sin_port = htons(ctdb->address.port);
288 sock.sin_family = PF_INET;
289 inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
291 ctdb->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
292 if (ctdb->listen_fd == -1) {
293 ctdb_set_error(ctdb, "socket failed\n");
297 setsockopt(ctdb->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
299 if (bind(ctdb->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
300 ctdb_set_error(ctdb, "bind failed\n");
301 close(ctdb->listen_fd);
302 ctdb->listen_fd = -1;
306 if (listen(ctdb->listen_fd, 10) == -1) {
307 ctdb_set_error(ctdb, "listen failed\n");
308 close(ctdb->listen_fd);
309 ctdb->listen_fd = -1;
313 event_add_fd(ctdb->ev, ctdb, ctdb->listen_fd, EVENT_FD_READ,
314 ctdb_listen_event, ctdb);
320 start the protocol going
322 int ctdb_start(struct ctdb_context *ctdb)
324 struct ctdb_node *node;
326 /* listen on our own address */
327 if (ctdb_listen(ctdb) != 0) return -1;
329 /* startup connections to the other servers - will happen on
331 for (node=ctdb->nodes;node;node=node->next) {
332 event_add_timed(ctdb->ev, node, timeval_zero(),
333 ctdb_node_connect, node);
340 make a remote ctdb call
342 int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
343 TDB_DATA *call_data, TDB_DATA *reply_data)
345 printf("ctdb_call not implemented\n");