691d0b30628bb5be4d92e80a71ac88aeb0cc04c5
[rusty/ctdb.git] / common / ctdb.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28
29 /*
30   choose the transport we will use
31 */
32 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
33 {
34         int ctdb_tcp_init(struct ctdb_context *ctdb);
35 #ifdef USE_INFINIBAND
36         int ctdb_ibw_init(struct ctdb_context *ctdb);
37 #endif /* USE_INFINIBAND */
38
39         if (strcmp(transport, "tcp") == 0) {
40                 return ctdb_tcp_init(ctdb);
41         }
42 #ifdef USE_INFINIBAND
43         if (strcmp(transport, "ib") == 0) {
44                 return ctdb_ibw_init(ctdb);
45         }
46 #endif /* USE_INFINIBAND */
47
48         ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
49         return -1;
50 }
51
52 /*
53   set some ctdb flags
54 */
55 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
56 {
57         ctdb->flags |= flags;
58 }
59
60 /*
61   clear some ctdb flags
62 */
63 void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
64 {
65         ctdb->flags &= ~flags;
66 }
67
68 /*
69   set max acess count before a dmaster migration
70 */
71 void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
72 {
73         ctdb->max_lacount = count;
74 }
75
76 /*
77   set the directory for the local databases
78 */
79 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
80 {
81         ctdb->db_directory = talloc_strdup(ctdb, dir);
82         if (ctdb->db_directory == NULL) {
83                 return -1;
84         }
85         return 0;
86 }
87
88 /*
89   add a node to the list of active nodes
90 */
91 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
92 {
93         struct ctdb_node *node, **nodep;
94
95         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
96         CTDB_NO_MEMORY(ctdb, nodep);
97
98         ctdb->nodes = nodep;
99         nodep = &ctdb->nodes[ctdb->num_nodes];
100         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
101         CTDB_NO_MEMORY(ctdb, *nodep);
102         node = *nodep;
103
104         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
105                 return -1;
106         }
107         node->ctdb = ctdb;
108         node->name = talloc_asprintf(node, "%s:%u", 
109                                      node->address.address, 
110                                      node->address.port);
111         /* for now we just set the vnn to the line in the file - this
112            will change! */
113         node->vnn = ctdb->num_nodes;
114
115         if (ctdb->methods->add_node(node) != 0) {
116                 talloc_free(node);
117                 return -1;
118         }
119
120         if (ctdb_same_address(&ctdb->address, &node->address)) {
121                 ctdb->vnn = node->vnn;
122         }
123
124         ctdb->num_nodes++;
125
126         return 0;
127 }
128
129 /*
130   setup the node list from a file
131 */
132 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
133 {
134         char **lines;
135         int nlines;
136         int i;
137
138         lines = file_lines_load(nlist, &nlines, ctdb);
139         if (lines == NULL) {
140                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
141                 return -1;
142         }
143
144         for (i=0;i<nlines;i++) {
145                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
146                         talloc_free(lines);
147                         return -1;
148                 }
149         }
150         
151         talloc_free(lines);
152         return 0;
153 }
154
155 /*
156   setup the local node address
157 */
158 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
159 {
160         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
161                 return -1;
162         }
163         
164         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
165                                      ctdb->address.address, 
166                                      ctdb->address.port);
167         return 0;
168 }
169
170 /*
171   add a node to the list of active nodes
172 */
173 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, int id)
174 {
175         struct ctdb_registered_call *call;
176
177         call = talloc(ctdb_db, struct ctdb_registered_call);
178         call->fn = fn;
179         call->id = id;
180
181         DLIST_ADD(ctdb_db->calls, call);        
182         return 0;
183 }
184
185 /*
186   return the vnn of this node
187 */
188 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
189 {
190         return ctdb->vnn;
191 }
192
193 /*
194   return the number of nodes
195 */
196 uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb)
197 {
198         return ctdb->num_nodes;
199 }
200
201
202 /*
203   called by the transport layer when a packet comes in
204 */
205 void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
206 {
207         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
208         TALLOC_CTX *tmp_ctx;
209
210         /* place the packet as a child of the tmp_ctx. We then use
211            talloc_free() below to free it. If any of the calls want
212            to keep it, then they will steal it somewhere else, and the
213            talloc_free() will only free the tmp_ctx */
214         tmp_ctx = talloc_new(ctdb);
215         talloc_steal(tmp_ctx, hdr);
216
217         if (length < sizeof(*hdr)) {
218                 ctdb_set_error(ctdb, "Bad packet length %d\n", length);
219                 goto done;
220         }
221         if (length != hdr->length) {
222                 ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
223                                hdr->length, length);
224                 goto done;
225         }
226
227         if (hdr->ctdb_magic != CTDB_MAGIC) {
228                 ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
229                 goto done;
230         }
231
232         if (hdr->ctdb_version != CTDB_VERSION) {
233                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
234                 goto done;
235         }
236
237         DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
238                  "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
239                  hdr->srcnode, hdr->destnode));
240
241         switch (hdr->operation) {
242         case CTDB_REQ_CALL:
243                 ctdb_request_call(ctdb, hdr);
244                 break;
245
246         case CTDB_REPLY_CALL:
247                 ctdb_reply_call(ctdb, hdr);
248                 break;
249
250         case CTDB_REPLY_ERROR:
251                 ctdb_reply_error(ctdb, hdr);
252                 break;
253
254         case CTDB_REPLY_REDIRECT:
255                 ctdb_reply_redirect(ctdb, hdr);
256                 break;
257
258         case CTDB_REQ_DMASTER:
259                 ctdb_request_dmaster(ctdb, hdr);
260                 break;
261
262         case CTDB_REPLY_DMASTER:
263                 ctdb_reply_dmaster(ctdb, hdr);
264                 break;
265
266         case CTDB_REQ_MESSAGE:
267                 ctdb_request_message(ctdb, hdr);
268                 break;
269
270         case CTDB_REQ_FINISHED:
271                 ctdb_request_finished(ctdb, hdr);
272                 break;
273
274         default:
275                 DEBUG(0,("%s: Packet with unknown operation %d\n", 
276                          __location__, hdr->operation));
277                 break;
278         }
279
280 done:
281         talloc_free(tmp_ctx);
282 }
283
284 /*
285   called by the transport layer when a node is dead
286 */
287 static void ctdb_node_dead(struct ctdb_node *node)
288 {
289         node->ctdb->num_connected--;
290         DEBUG(1,("%s: node %s is dead: %d connected\n", 
291                  node->ctdb->name, node->name, node->ctdb->num_connected));
292 }
293
294 /*
295   called by the transport layer when a node is connected
296 */
297 static void ctdb_node_connected(struct ctdb_node *node)
298 {
299         node->ctdb->num_connected++;
300         DEBUG(1,("%s: connected to %s - %d connected\n", 
301                  node->ctdb->name, node->name, node->ctdb->num_connected));
302 }
303
304 /*
305   wait for all nodes to be connected
306 */
307 void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
308 {
309         int expected = ctdb->num_nodes - 1;
310         if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
311                 expected++;
312         }
313         while (ctdb->num_connected != expected) {
314                 DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n", 
315                          expected, ctdb->num_connected));
316                 event_loop_once(ctdb->ev);
317         }
318         DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected));
319 }
320
321 /*
322   queue a packet or die
323 */
324 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
325 {
326         struct ctdb_node *node;
327         node = ctdb->nodes[hdr->destnode];
328         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
329                 ctdb_fatal(ctdb, "Unable to queue packet\n");
330         }
331 }
332
333
334 static const struct ctdb_upcalls ctdb_upcalls = {
335         .recv_pkt       = ctdb_recv_pkt,
336         .node_dead      = ctdb_node_dead,
337         .node_connected = ctdb_node_connected
338 };
339
340 /*
341   initialise the ctdb daemon. 
342
343   NOTE: In current code the daemon does not fork. This is for testing purposes only
344   and to simplify the code.
345 */
346 struct ctdb_context *ctdb_init(struct event_context *ev)
347 {
348         struct ctdb_context *ctdb;
349
350         ctdb = talloc_zero(ev, struct ctdb_context);
351         ctdb->ev = ev;
352         ctdb->upcalls = &ctdb_upcalls;
353         ctdb->idr = idr_init(ctdb);
354         ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
355
356         return ctdb;
357 }
358