Merge commit 'ronnie/master'
[sahlberg/ctdb.git] / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27
28 /*
29   choose the transport we will use
30 */
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 {
33         ctdb->transport = talloc_strdup(ctdb, transport);
34         return 0;
35 }
36
37 /*
38   Check whether an ip is a valid node ip
39   Returns the node id for this ip address or -1
40 */
41 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
42 {
43         int nodeid;
44
45         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
46                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
47                         return nodeid;
48                 }
49         }
50
51         return -1;
52 }
53
54 /*
55   choose the recovery lock file
56 */
57 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
58 {
59         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
60         return 0;
61 }
62
63 /*
64   set the directory for the local databases
65 */
66 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
67 {
68         ctdb->db_directory = talloc_strdup(ctdb, dir);
69         if (ctdb->db_directory == NULL) {
70                 return -1;
71         }
72         return 0;
73 }
74
75 /*
76   set the directory for the persistent databases
77 */
78 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
79 {
80         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
81         if (ctdb->db_directory_persistent == NULL) {
82                 return -1;
83         }
84         return 0;
85 }
86
87 /*
88   add a node to the list of active nodes
89 */
90 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
91 {
92         struct ctdb_node *node, **nodep;
93
94         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
95         CTDB_NO_MEMORY(ctdb, nodep);
96
97         ctdb->nodes = nodep;
98         nodep = &ctdb->nodes[ctdb->num_nodes];
99         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
100         CTDB_NO_MEMORY(ctdb, *nodep);
101         node = *nodep;
102
103         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
104                 return -1;
105         }
106         node->ctdb = ctdb;
107         node->name = talloc_asprintf(node, "%s:%u", 
108                                      node->address.address, 
109                                      node->address.port);
110         /* this assumes that the nodes are kept in sorted order, and no gaps */
111         node->pnn = ctdb->num_nodes;
112
113         /* nodes start out disconnected and unhealthy */
114         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
115
116         if (ctdb->address.address &&
117             ctdb_same_address(&ctdb->address, &node->address)) {
118                 /* for automatic binding to interfaces, see tcp_connect.c */
119                 ctdb->pnn = node->pnn;
120                 node->flags &= ~NODE_FLAGS_DISCONNECTED;
121
122                 /* do we start out in DISABLED mode? */
123                 if (ctdb->start_as_disabled != 0) {
124                         DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
125                         node->flags |= NODE_FLAGS_DISABLED;
126                 }
127         }
128
129         ctdb->num_nodes++;
130         node->dead_count = 0;
131
132         return 0;
133 }
134
135 /*
136   setup the node list from a file
137 */
138 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
139 {
140         char **lines;
141         int nlines;
142         int i;
143
144         talloc_free(ctdb->nodes);
145         ctdb->nodes     = NULL;
146         ctdb->num_nodes = 0;
147
148         talloc_free(ctdb->node_list_file);
149         ctdb->node_list_file = talloc_strdup(ctdb, nlist);
150
151         lines = file_lines_load(nlist, &nlines, ctdb);
152         if (lines == NULL) {
153                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
154                 return -1;
155         }
156         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
157                 nlines--;
158         }
159
160         for (i=0;i<nlines;i++) {
161                 char *node;
162
163                 node = lines[i];
164                 /* strip leading spaces */
165                 while((*node == ' ') || (*node == '\t')) {
166                         node++;
167                 }
168                 if (*node == '#') {
169                         continue;
170                 }
171                 if (strcmp(node, "") == 0) {
172                         continue;
173                 }
174                 if (ctdb_add_node(ctdb, node) != 0) {
175                         talloc_free(lines);
176                         return -1;
177                 }
178         }
179
180         /* initialize the vnn mapping table now that we have num_nodes setup */
181         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
182         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
183
184         ctdb->vnn_map->generation = INVALID_GENERATION;
185         ctdb->vnn_map->size = ctdb->num_nodes;
186         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
187         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
188
189         for(i=0;i<ctdb->vnn_map->size;i++) {
190                 ctdb->vnn_map->map[i] = i;
191         }
192         
193         talloc_free(lines);
194         return 0;
195 }
196
197
198 /*
199   setup the local node address
200 */
201 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
202 {
203         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
204                 return -1;
205         }
206         
207         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
208                                      ctdb->address.address, 
209                                      ctdb->address.port);
210         return 0;
211 }
212
213
214 /*
215   return the number of active nodes
216 */
217 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
218 {
219         int i;
220         uint32_t count=0;
221         for (i=0;i<ctdb->vnn_map->size;i++) {
222                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
223                 if (!(node->flags & NODE_FLAGS_INACTIVE)) {
224                         count++;
225                 }
226         }
227         return count;
228 }
229
230
231 /*
232   called when we need to process a packet. This can be a requeued packet
233   after a lockwait, or a real packet from another node
234 */
235 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
236 {
237         TALLOC_CTX *tmp_ctx;
238
239         /* place the packet as a child of the tmp_ctx. We then use
240            talloc_free() below to free it. If any of the calls want
241            to keep it, then they will steal it somewhere else, and the
242            talloc_free() will only free the tmp_ctx */
243         tmp_ctx = talloc_new(ctdb);
244         talloc_steal(tmp_ctx, hdr);
245
246         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
247                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
248                  hdr->srcnode, hdr->destnode));
249
250         switch (hdr->operation) {
251         case CTDB_REQ_CALL:
252         case CTDB_REPLY_CALL:
253         case CTDB_REQ_DMASTER:
254         case CTDB_REPLY_DMASTER:
255                 /* we dont allow these calls when banned */
256                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
257                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
258                                 " request %u"
259                                 " length %u from node %u to %u while node"
260                                 " is banned\n",
261                                  hdr->operation, hdr->reqid,
262                                  hdr->length, 
263                                  hdr->srcnode, hdr->destnode));
264                         goto done;
265                 }
266
267                 /* for ctdb_call inter-node operations verify that the
268                    remote node that sent us the call is running in the
269                    same generation instance as this node
270                 */
271                 if (ctdb->vnn_map->generation != hdr->generation) {
272                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
273                                 " request %u"
274                                 " length %u from node %u to %u had an"
275                                 " invalid generation id:%u while our"
276                                 " generation id is:%u\n", 
277                                  hdr->operation, hdr->reqid,
278                                  hdr->length, 
279                                  hdr->srcnode, hdr->destnode, 
280                                  hdr->generation, ctdb->vnn_map->generation));
281                         goto done;
282                 }
283         }
284
285         switch (hdr->operation) {
286         case CTDB_REQ_CALL:
287                 ctdb->statistics.node.req_call++;
288                 ctdb_request_call(ctdb, hdr);
289                 break;
290
291         case CTDB_REPLY_CALL:
292                 ctdb->statistics.node.reply_call++;
293                 ctdb_reply_call(ctdb, hdr);
294                 break;
295
296         case CTDB_REPLY_ERROR:
297                 ctdb->statistics.node.reply_error++;
298                 ctdb_reply_error(ctdb, hdr);
299                 break;
300
301         case CTDB_REQ_DMASTER:
302                 ctdb->statistics.node.req_dmaster++;
303                 ctdb_request_dmaster(ctdb, hdr);
304                 break;
305
306         case CTDB_REPLY_DMASTER:
307                 ctdb->statistics.node.reply_dmaster++;
308                 ctdb_reply_dmaster(ctdb, hdr);
309                 break;
310
311         case CTDB_REQ_MESSAGE:
312                 ctdb->statistics.node.req_message++;
313                 ctdb_request_message(ctdb, hdr);
314                 break;
315
316         case CTDB_REQ_CONTROL:
317                 ctdb->statistics.node.req_control++;
318                 ctdb_request_control(ctdb, hdr);
319                 break;
320
321         case CTDB_REPLY_CONTROL:
322                 ctdb->statistics.node.reply_control++;
323                 ctdb_reply_control(ctdb, hdr);
324                 break;
325
326         case CTDB_REQ_KEEPALIVE:
327                 ctdb->statistics.keepalive_packets_recv++;
328                 break;
329
330         default:
331                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
332                          __location__, hdr->operation));
333                 break;
334         }
335
336 done:
337         talloc_free(tmp_ctx);
338 }
339
340
341 /*
342   called by the transport layer when a node is dead
343 */
344 void ctdb_node_dead(struct ctdb_node *node)
345 {
346         if (node->flags & NODE_FLAGS_DISCONNECTED) {
347                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
348                          node->ctdb->name, node->name, 
349                          node->ctdb->num_connected));
350                 return;
351         }
352         node->ctdb->num_connected--;
353         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
354         node->rx_cnt = 0;
355         node->dead_count = 0;
356
357         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
358                  node->ctdb->name, node->name, node->ctdb->num_connected));
359         ctdb_daemon_cancel_controls(node->ctdb, node);
360
361         if (node->ctdb->methods == NULL) {
362                 DEBUG(DEBUG_ALERT,(__location__ " Can not restart transport. ctdb->methods==NULL\n"));
363                 ctdb_fatal(node->ctdb, "can not restart transport.");
364         }
365
366         node->ctdb->methods->restart(node);
367 }
368
369 /*
370   called by the transport layer when a node is connected
371 */
372 void ctdb_node_connected(struct ctdb_node *node)
373 {
374         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
375                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
376                          node->ctdb->name, node->name, 
377                          node->ctdb->num_connected));
378                 return;
379         }
380         node->ctdb->num_connected++;
381         node->dead_count = 0;
382         node->flags &= ~NODE_FLAGS_DISCONNECTED;
383         node->flags |= NODE_FLAGS_UNHEALTHY;
384         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
385                  node->ctdb->name, node->name, node->ctdb->num_connected));
386 }
387
388 struct queue_next {
389         struct ctdb_context *ctdb;
390         struct ctdb_req_header *hdr;
391 };
392
393
394 /*
395   trigered when a deferred packet is due
396  */
397 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
398                                struct timeval t, void *private_data)
399 {
400         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
401         ctdb_input_pkt(q->ctdb, q->hdr);
402         talloc_free(q);
403 }       
404
405 /*
406   defer a packet, so it is processed on the next event loop
407   this is used for sending packets to ourselves
408  */
409 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
410 {
411         struct queue_next *q;
412         q = talloc(ctdb, struct queue_next);
413         if (q == NULL) {
414                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
415                 return;
416         }
417         q->ctdb = ctdb;
418         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
419         if (q->hdr == NULL) {
420                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
421                 return;
422         }
423 #if 0
424         /* use this to put packets directly into our recv function */
425         ctdb_input_pkt(q->ctdb, q->hdr);
426 #else
427         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
428 #endif
429 }
430
431
432 /*
433   broadcast a packet to all nodes
434 */
435 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
436                                       struct ctdb_req_header *hdr)
437 {
438         int i;
439         for (i=0;i<ctdb->num_nodes;i++) {
440                 hdr->destnode = ctdb->nodes[i]->pnn;
441                 ctdb_queue_packet(ctdb, hdr);
442         }
443 }
444
445 /*
446   broadcast a packet to all nodes in the current vnnmap
447 */
448 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
449                                          struct ctdb_req_header *hdr)
450 {
451         int i;
452         for (i=0;i<ctdb->vnn_map->size;i++) {
453                 hdr->destnode = ctdb->vnn_map->map[i];
454                 ctdb_queue_packet(ctdb, hdr);
455         }
456 }
457
458 /*
459   broadcast a packet to all connected nodes
460 */
461 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
462                                             struct ctdb_req_header *hdr)
463 {
464         int i;
465         for (i=0;i<ctdb->num_nodes;i++) {
466                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
467                         hdr->destnode = ctdb->nodes[i]->pnn;
468                         ctdb_queue_packet(ctdb, hdr);
469                 }
470         }
471 }
472
473 /*
474   queue a packet or die
475 */
476 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
477 {
478         struct ctdb_node *node;
479
480         switch (hdr->destnode) {
481         case CTDB_BROADCAST_ALL:
482                 ctdb_broadcast_packet_all(ctdb, hdr);
483                 return;
484         case CTDB_BROADCAST_VNNMAP:
485                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
486                 return;
487         case CTDB_BROADCAST_CONNECTED:
488                 ctdb_broadcast_packet_connected(ctdb, hdr);
489                 return;
490         }
491
492         ctdb->statistics.node_packets_sent++;
493
494         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
495                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
496                          hdr->destnode));
497                 return;
498         }
499
500         node = ctdb->nodes[hdr->destnode];
501
502         if (hdr->destnode == ctdb->pnn) {
503                 ctdb_defer_packet(ctdb, hdr);
504         } else {
505                 if (ctdb->methods == NULL) {
506                         DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. Transport is DOWN\n"));
507                         return;
508                 }
509
510                 node->tx_cnt++;
511                 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
512                         ctdb_fatal(ctdb, "Unable to queue packet\n");
513                 }
514         }
515 }
516
517
518
519
520 /*
521   a valgrind hack to allow us to get opcode specific backtraces
522   very ugly, and relies on no compiler optimisation!
523 */
524 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
525 {
526         switch (opcode) {
527 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
528                 DO_OP(1);
529                 DO_OP(2);
530                 DO_OP(3);
531                 DO_OP(4);
532                 DO_OP(5);
533                 DO_OP(6);
534                 DO_OP(7);
535                 DO_OP(8);
536                 DO_OP(9);
537                 DO_OP(10);
538                 DO_OP(11);
539                 DO_OP(12);
540                 DO_OP(13);
541                 DO_OP(14);
542                 DO_OP(15);
543                 DO_OP(16);
544                 DO_OP(17);
545                 DO_OP(18);
546                 DO_OP(19);
547                 DO_OP(20);
548                 DO_OP(21);
549                 DO_OP(22);
550                 DO_OP(23);
551                 DO_OP(24);
552                 DO_OP(25);
553                 DO_OP(26);
554                 DO_OP(27);
555                 DO_OP(28);
556                 DO_OP(29);
557                 DO_OP(30);
558                 DO_OP(31);
559                 DO_OP(32);
560                 DO_OP(33);
561                 DO_OP(34);
562                 DO_OP(35);
563                 DO_OP(36);
564                 DO_OP(37);
565                 DO_OP(38);
566                 DO_OP(39);
567                 DO_OP(40);
568                 DO_OP(41);
569                 DO_OP(42);
570                 DO_OP(43);
571                 DO_OP(44);
572                 DO_OP(45);
573                 DO_OP(46);
574                 DO_OP(47);
575                 DO_OP(48);
576                 DO_OP(49);
577                 DO_OP(50);
578                 DO_OP(51);
579                 DO_OP(52);
580                 DO_OP(53);
581                 DO_OP(54);
582                 DO_OP(55);
583                 DO_OP(56);
584                 DO_OP(57);
585                 DO_OP(58);
586                 DO_OP(59);
587                 DO_OP(60);
588                 DO_OP(61);
589                 DO_OP(62);
590                 DO_OP(63);
591                 DO_OP(64);
592                 DO_OP(65);
593                 DO_OP(66);
594                 DO_OP(67);
595                 DO_OP(68);
596                 DO_OP(69);
597                 DO_OP(70);
598                 DO_OP(71);
599                 DO_OP(72);
600                 DO_OP(73);
601                 DO_OP(74);
602                 DO_OP(75);
603                 DO_OP(76);
604                 DO_OP(77);
605                 DO_OP(78);
606                 DO_OP(79);
607                 DO_OP(80);
608                 DO_OP(81);
609                 DO_OP(82);
610                 DO_OP(83);
611                 DO_OP(84);
612                 DO_OP(85);
613                 DO_OP(86);
614                 DO_OP(87);
615                 DO_OP(88);
616                 DO_OP(89);
617                 DO_OP(90);
618                 DO_OP(91);
619                 DO_OP(92);
620                 DO_OP(93);
621                 DO_OP(94);
622                 DO_OP(95);
623                 DO_OP(96);
624                 DO_OP(97);
625                 DO_OP(98);
626                 DO_OP(99);
627                 DO_OP(100);
628         default: 
629                 ctdb_queue_packet(ctdb, hdr);
630                 break;
631         }
632 }