merge from tridge
[martins/samba.git] / ctdb / common / ctdb.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28
29 /*
30   choose the transport we will use
31 */
32 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
33 {
34         int ctdb_tcp_init(struct ctdb_context *ctdb);
35 #ifdef USE_INFINIBAND
36         int ctdb_ibw_init(struct ctdb_context *ctdb);
37 #endif /* USE_INFINIBAND */
38
39         if (strcmp(transport, "tcp") == 0) {
40                 return ctdb_tcp_init(ctdb);
41         }
42 #ifdef USE_INFINIBAND
43         if (strcmp(transport, "ib") == 0) {
44                 return ctdb_ibw_init(ctdb);
45         }
46 #endif /* USE_INFINIBAND */
47
48         ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
49         return -1;
50 }
51
52 /*
53   set some ctdb flags
54 */
55 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
56 {
57         ctdb->flags |= flags;
58 }
59
60 /*
61   clear some ctdb flags
62 */
63 void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
64 {
65         ctdb->flags &= ~flags;
66 }
67
68 /*
69   set max acess count before a dmaster migration
70 */
71 void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
72 {
73         ctdb->max_lacount = count;
74 }
75
76 /*
77   add a node to the list of active nodes
78 */
79 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
80 {
81         struct ctdb_node *node, **nodep;
82
83         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
84         CTDB_NO_MEMORY(ctdb, nodep);
85
86         ctdb->nodes = nodep;
87         nodep = &ctdb->nodes[ctdb->num_nodes];
88         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
89         CTDB_NO_MEMORY(ctdb, *nodep);
90         node = *nodep;
91
92         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
93                 return -1;
94         }
95         node->ctdb = ctdb;
96         node->name = talloc_asprintf(node, "%s:%u", 
97                                      node->address.address, 
98                                      node->address.port);
99         /* for now we just set the vnn to the line in the file - this
100            will change! */
101         node->vnn = ctdb->num_nodes;
102
103         if (ctdb->methods->add_node(node) != 0) {
104                 talloc_free(node);
105                 return -1;
106         }
107
108         if (ctdb_same_address(&ctdb->address, &node->address)) {
109                 ctdb->vnn = node->vnn;
110         }
111
112         ctdb->num_nodes++;
113
114         return 0;
115 }
116
117 /*
118   setup the node list from a file
119 */
120 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
121 {
122         char **lines;
123         int nlines;
124         int i;
125
126         lines = file_lines_load(nlist, &nlines, ctdb);
127         if (lines == NULL) {
128                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
129                 return -1;
130         }
131
132         for (i=0;i<nlines;i++) {
133                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
134                         talloc_free(lines);
135                         return -1;
136                 }
137         }
138         
139         talloc_free(lines);
140         return 0;
141 }
142
143 /*
144   setup the local node address
145 */
146 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
147 {
148         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
149                 return -1;
150         }
151         
152         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
153                                      ctdb->address.address, 
154                                      ctdb->address.port);
155         return 0;
156 }
157
158 /*
159   add a node to the list of active nodes
160 */
161 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, int id)
162 {
163         struct ctdb_registered_call *call;
164
165         call = talloc(ctdb_db, struct ctdb_registered_call);
166         call->fn = fn;
167         call->id = id;
168
169         DLIST_ADD(ctdb_db->calls, call);        
170         return 0;
171 }
172
173 /*
174   return the vnn of this node
175 */
176 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
177 {
178         return ctdb->vnn;
179 }
180
181 /*
182   return the number of nodes
183 */
184 uint32_t ctdb_get_num_nodes(struct ctdb_context *ctdb)
185 {
186         return ctdb->num_nodes;
187 }
188
189
190 /*
191   called by the transport layer when a packet comes in
192 */
193 void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
194 {
195         struct ctdb_req_header *hdr;
196
197         if (length < sizeof(*hdr)) {
198                 ctdb_set_error(ctdb, "Bad packet length %d\n", length);
199                 return;
200         }
201         hdr = (struct ctdb_req_header *)data;
202         if (length != hdr->length) {
203                 ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
204                                hdr->length, length);
205                 return;
206         }
207
208         if (hdr->ctdb_magic != CTDB_MAGIC) {
209                 ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
210                 return;
211         }
212
213         if (hdr->ctdb_version != CTDB_VERSION) {
214                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
215                 return;
216         }
217
218         DEBUG(3,(__location__ " ctdb request of type %d length %d from node %d to %d\n",
219                  hdr->operation, hdr->length, hdr->srcnode, hdr->destnode));
220
221         switch (hdr->operation) {
222         case CTDB_REQ_CALL:
223                 ctdb_request_call(ctdb, hdr);
224                 break;
225
226         case CTDB_REPLY_CALL:
227                 ctdb_reply_call(ctdb, hdr);
228                 break;
229
230         case CTDB_REPLY_ERROR:
231                 ctdb_reply_error(ctdb, hdr);
232                 break;
233
234         case CTDB_REPLY_REDIRECT:
235                 ctdb_reply_redirect(ctdb, hdr);
236                 break;
237
238         case CTDB_REQ_DMASTER:
239                 ctdb_request_dmaster(ctdb, hdr);
240                 break;
241
242         case CTDB_REPLY_DMASTER:
243                 ctdb_reply_dmaster(ctdb, hdr);
244                 break;
245
246         case CTDB_REQ_MESSAGE:
247                 ctdb_request_message(ctdb, hdr);
248                 break;
249
250         default:
251                 DEBUG(0,("%s: Packet with unknown operation %d\n", 
252                          __location__, hdr->operation));
253                 break;
254         }
255         talloc_free(hdr);
256 }
257
258 /*
259   called by the transport layer when a node is dead
260 */
261 static void ctdb_node_dead(struct ctdb_node *node)
262 {
263         node->ctdb->num_connected--;
264         DEBUG(1,("%s: node %s is dead: %d connected\n", 
265                  node->ctdb->name, node->name, node->ctdb->num_connected));
266 }
267
268 /*
269   called by the transport layer when a node is connected
270 */
271 static void ctdb_node_connected(struct ctdb_node *node)
272 {
273         node->ctdb->num_connected++;
274         DEBUG(1,("%s: connected to %s - %d connected\n", 
275                  node->ctdb->name, node->name, node->ctdb->num_connected));
276 }
277
278 /*
279   wait for all nodes to be connected
280 */
281 void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
282 {
283         int expected = ctdb->num_nodes - 1;
284         if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
285                 expected++;
286         }
287         while (ctdb->num_connected != expected) {
288                 DEBUG(3,("ctdb_connect_wait: waiting for %d nodes (have %d)\n", 
289                          expected, ctdb->num_connected));
290                 event_loop_once(ctdb->ev);
291         }
292         DEBUG(3,("ctdb_connect_wait: got all %d nodes\n", expected));
293 }
294
295 /*
296   wait until we're the only node left
297 */
298 void ctdb_wait_loop(struct ctdb_context *ctdb)
299 {
300         int expected = 0;
301         if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
302                 expected++;
303         }
304         while (ctdb->num_connected > expected) {
305                 event_loop_once(ctdb->ev);
306         }
307 }
308
309
310 /*
311   queue a packet or die
312 */
313 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
314 {
315         struct ctdb_node *node;
316         node = ctdb->nodes[hdr->destnode];
317         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
318                 ctdb_fatal(ctdb, "Unable to queue packet\n");
319         }
320 }
321
322
323 static const struct ctdb_upcalls ctdb_upcalls = {
324         .recv_pkt       = ctdb_recv_pkt,
325         .node_dead      = ctdb_node_dead,
326         .node_connected = ctdb_node_connected
327 };
328
329 /*
330   initialise the ctdb daemon. 
331
332   NOTE: In current code the daemon does not fork. This is for testing purposes only
333   and to simplify the code.
334 */
335 struct ctdb_context *ctdb_init(struct event_context *ev)
336 {
337         struct ctdb_context *ctdb;
338
339         ctdb = talloc_zero(ev, struct ctdb_context);
340         ctdb->ev = ev;
341         ctdb->upcalls = &ctdb_upcalls;
342         ctdb->idr = idr_init(ctdb);
343         ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
344
345         return ctdb;
346 }
347