354b0f64ceefb6cefc262f08cccbe6cfc85fd657
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 #include "includes.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
28
29 /*
30   choose the transport we will use
31 */
32 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
33 {
34         ctdb->transport = talloc_strdup(ctdb, transport);
35         return 0;
36 }
37
38 /*
39   choose the recovery lock file
40 */
41 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
42 {
43         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
44         return 0;
45 }
46
47 /*
48   choose the logfile location
49 */
50 int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
51 {
52         ctdb->logfile = talloc_strdup(ctdb, logfile);
53         if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
54                 int fd;
55                 fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
56                 if (fd == -1) {
57                         printf("Failed to open logfile %s\n", ctdb->logfile);
58                         abort();
59                 }
60                 close(1);
61                 close(2);
62                 if (fd != 1) {
63                         dup2(fd, 1);
64                         close(fd);
65                 }
66                 /* also catch stderr of subcommands to the log file */
67                 dup2(1, 2);
68         }
69         return 0;
70 }
71
72
73 /*
74   set some ctdb flags
75 */
76 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
77 {
78         ctdb->flags |= flags;
79 }
80
81 /*
82   clear some ctdb flags
83 */
84 void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
85 {
86         ctdb->flags &= ~flags;
87 }
88
89 /*
90   set the directory for the local databases
91 */
92 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
93 {
94         ctdb->db_directory = talloc_strdup(ctdb, dir);
95         if (ctdb->db_directory == NULL) {
96                 return -1;
97         }
98         return 0;
99 }
100
101 /*
102   add a node to the list of active nodes
103 */
104 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
105 {
106         struct ctdb_node *node, **nodep;
107
108         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
109         CTDB_NO_MEMORY(ctdb, nodep);
110
111         ctdb->nodes = nodep;
112         nodep = &ctdb->nodes[ctdb->num_nodes];
113         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
114         CTDB_NO_MEMORY(ctdb, *nodep);
115         node = *nodep;
116
117         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
118                 return -1;
119         }
120         node->ctdb = ctdb;
121         node->name = talloc_asprintf(node, "%s:%u", 
122                                      node->address.address, 
123                                      node->address.port);
124         /* this assumes that the nodes are kept in sorted order, and no gaps */
125         node->vnn = ctdb->num_nodes;
126
127         if (ctdb->address.address &&
128             ctdb_same_address(&ctdb->address, &node->address)) {
129                 ctdb->vnn = node->vnn;
130                 node->flags |= NODE_FLAGS_CONNECTED;
131         }
132
133         ctdb->num_nodes++;
134         node->dead_count = 0;
135
136         return 0;
137 }
138
139 /*
140   setup the node list from a file
141 */
142 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
143 {
144         char **lines;
145         int nlines;
146         int i;
147
148         talloc_free(ctdb->node_list_file);
149         ctdb->node_list_file = talloc_strdup(ctdb, nlist);
150
151         lines = file_lines_load(nlist, &nlines, ctdb);
152         if (lines == NULL) {
153                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
154                 return -1;
155         }
156         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
157                 nlines--;
158         }
159
160         for (i=0;i<nlines;i++) {
161                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
162                         talloc_free(lines);
163                         return -1;
164                 }
165         }
166
167         /* initialize the vnn mapping table now that we have num_nodes setup */
168 /*
169 XXX we currently initialize it to the maximum number of nodes to 
170 XXX make it behave the same way as previously.  
171 XXX Once we have recovery working we should initialize this always to 
172 XXX generation==0 (==invalid) and let the recovery tool populate this 
173 XXX table for the daemons. 
174 */
175         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
176         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
177
178         ctdb->vnn_map->generation = 1;
179         ctdb->vnn_map->size = ctdb->num_nodes;
180         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
181         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
182
183         for(i=0;i<ctdb->vnn_map->size;i++) {
184                 ctdb->vnn_map->map[i] = i;
185         }
186         
187         talloc_free(lines);
188         return 0;
189 }
190
191
192 /*
193   setup the local node address
194 */
195 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
196 {
197         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
198                 return -1;
199         }
200         
201         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
202                                      ctdb->address.address, 
203                                      ctdb->address.port);
204         return 0;
205 }
206
207
208 /*
209   setup the local socket name
210 */
211 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
212 {
213         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
214         return 0;
215 }
216 /*
217   return the vnn of this node
218 */
219 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
220 {
221         return ctdb->vnn;
222 }
223
224 /*
225   return the number of enabled nodes
226 */
227 uint32_t ctdb_get_num_enabled_nodes(struct ctdb_context *ctdb)
228 {
229         int i;
230         uint32_t count=0;
231         for (i=0;i<ctdb->vnn_map->size;i++) {
232                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
233                 if ((node->flags & NODE_FLAGS_CONNECTED) &&
234                     !(node->flags & NODE_FLAGS_DISABLED)) {
235                         count++;
236                 }
237         }
238         return count;
239 }
240
241
242 /*
243   called when we need to process a packet. This can be a requeued packet
244   after a lockwait, or a real packet from another node
245 */
246 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
247 {
248         TALLOC_CTX *tmp_ctx;
249
250         /* place the packet as a child of the tmp_ctx. We then use
251            talloc_free() below to free it. If any of the calls want
252            to keep it, then they will steal it somewhere else, and the
253            talloc_free() will only free the tmp_ctx */
254         tmp_ctx = talloc_new(ctdb);
255         talloc_steal(tmp_ctx, hdr);
256
257         DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
258                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
259                  hdr->srcnode, hdr->destnode));
260
261         switch (hdr->operation) {
262         case CTDB_REQ_CALL:
263         case CTDB_REPLY_CALL:
264         case CTDB_REQ_DMASTER:
265         case CTDB_REPLY_DMASTER:
266                 /* for ctdb_call inter-node operations verify that the
267                    remote node that sent us the call is running in the
268                    same generation instance as this node
269                 */
270                 if (ctdb->vnn_map->generation != hdr->generation) {
271                         DEBUG(0,(__location__ " ctdb request %u"
272                                 " length %u from node %u to %u had an"
273                                 " invalid generation id:%u while our"
274                                 " generation id is:%u\n", 
275                                  hdr->reqid, hdr->length, 
276                                  hdr->srcnode, hdr->destnode, 
277                                  hdr->generation, ctdb->vnn_map->generation));
278                         goto done;
279                 }
280         }
281
282         switch (hdr->operation) {
283         case CTDB_REQ_CALL:
284                 ctdb->statistics.node.req_call++;
285                 ctdb_request_call(ctdb, hdr);
286                 break;
287
288         case CTDB_REPLY_CALL:
289                 ctdb->statistics.node.reply_call++;
290                 ctdb_reply_call(ctdb, hdr);
291                 break;
292
293         case CTDB_REPLY_ERROR:
294                 ctdb->statistics.node.reply_error++;
295                 ctdb_reply_error(ctdb, hdr);
296                 break;
297
298         case CTDB_REQ_DMASTER:
299                 ctdb->statistics.node.req_dmaster++;
300                 ctdb_request_dmaster(ctdb, hdr);
301                 break;
302
303         case CTDB_REPLY_DMASTER:
304                 ctdb->statistics.node.reply_dmaster++;
305                 ctdb_reply_dmaster(ctdb, hdr);
306                 break;
307
308         case CTDB_REQ_MESSAGE:
309                 ctdb->statistics.node.req_message++;
310                 ctdb_request_message(ctdb, hdr);
311                 break;
312
313         case CTDB_REQ_CONTROL:
314                 ctdb->statistics.node.req_control++;
315                 ctdb_request_control(ctdb, hdr);
316                 break;
317
318         case CTDB_REPLY_CONTROL:
319                 ctdb->statistics.node.reply_control++;
320                 ctdb_reply_control(ctdb, hdr);
321                 break;
322
323         case CTDB_REQ_KEEPALIVE:
324                 ctdb->statistics.keepalive_packets_recv++;
325                 break;
326
327         default:
328                 DEBUG(0,("%s: Packet with unknown operation %u\n", 
329                          __location__, hdr->operation));
330                 break;
331         }
332
333 done:
334         talloc_free(tmp_ctx);
335 }
336
337
338 /*
339   called by the transport layer when a packet comes in
340 */
341 static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
342 {
343         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
344
345         ctdb->statistics.node_packets_recv++;
346
347         /* up the counter for this source node, so we know its alive */
348         if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
349                 /* as a special case, redirected calls don't increment the rx_cnt */
350                 if (hdr->operation != CTDB_REQ_CALL ||
351                     ((struct ctdb_req_call *)hdr)->hopcount == 0) {
352                         ctdb->nodes[hdr->srcnode]->rx_cnt++;
353                 }
354         }
355
356         ctdb_input_pkt(ctdb, hdr);
357 }
358
359
360 /*
361   called by the transport layer when a node is dead
362 */
363 void ctdb_node_dead(struct ctdb_node *node)
364 {
365         if (!(node->flags & NODE_FLAGS_CONNECTED)) {
366                 DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", 
367                          node->ctdb->name, node->name, 
368                          node->ctdb->num_connected));
369                 return;
370         }
371         node->ctdb->num_connected--;
372         node->flags &= ~NODE_FLAGS_CONNECTED;
373         node->rx_cnt = 0;
374         node->dead_count = 0;
375         DEBUG(1,("%s: node %s is dead: %u connected\n", 
376                  node->ctdb->name, node->name, node->ctdb->num_connected));
377         ctdb_daemon_cancel_controls(node->ctdb, node);
378 }
379
380 /*
381   called by the transport layer when a node is connected
382 */
383 void ctdb_node_connected(struct ctdb_node *node)
384 {
385         if (node->flags & NODE_FLAGS_CONNECTED) {
386                 DEBUG(1,("%s: node %s is already marked connected: %u connected\n", 
387                          node->ctdb->name, node->name, 
388                          node->ctdb->num_connected));
389                 return;
390         }
391         node->ctdb->num_connected++;
392         node->dead_count = 0;
393         node->flags |= NODE_FLAGS_CONNECTED;
394         DEBUG(1,("%s: connected to %s - %u connected\n", 
395                  node->ctdb->name, node->name, node->ctdb->num_connected));
396 }
397
398 struct queue_next {
399         struct ctdb_context *ctdb;
400         struct ctdb_req_header *hdr;
401 };
402
403
404 /*
405   trigered when a deferred packet is due
406  */
407 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
408                                struct timeval t, void *private_data)
409 {
410         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
411         ctdb_input_pkt(q->ctdb, q->hdr);
412         talloc_free(q);
413 }       
414
415 /*
416   defer a packet, so it is processed on the next event loop
417   this is used for sending packets to ourselves
418  */
419 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
420 {
421         struct queue_next *q;
422         q = talloc(ctdb, struct queue_next);
423         if (q == NULL) {
424                 DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
425                 return;
426         }
427         q->ctdb = ctdb;
428         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
429         if (q->hdr == NULL) {
430                 DEBUG(0,("Error copying deferred packet to self\n"));
431                 return;
432         }
433 #if 0
434         /* use this to put packets directly into our recv function */
435         ctdb_input_pkt(q->ctdb, q->hdr);
436 #else
437         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
438 #endif
439 }
440
441
442 /*
443   broadcast a packet to all nodes
444 */
445 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
446 {
447         int i;
448         for (i=0;i<ctdb->num_nodes;i++) {
449                 hdr->destnode = ctdb->nodes[i]->vnn;
450                 ctdb_queue_packet(ctdb, hdr);
451         }
452 }
453
454 /*
455   broadcast a packet to all nodes in the current vnnmap
456 */
457 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
458 {
459         int i;
460         for (i=0;i<ctdb->vnn_map->size;i++) {
461                 hdr->destnode = ctdb->vnn_map->map[i];
462                 ctdb_queue_packet(ctdb, hdr);
463         }
464 }
465
466 /*
467   queue a packet or die
468 */
469 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
470 {
471         struct ctdb_node *node;
472
473         switch (hdr->destnode) {
474         case CTDB_BROADCAST_ALL:
475                 ctdb_broadcast_packet_all(ctdb, hdr);
476                 return;
477         case CTDB_BROADCAST_VNNMAP:
478                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
479                 return;
480         }
481
482         ctdb->statistics.node_packets_sent++;
483
484         if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
485                 DEBUG(0,(__location__ " cant send to node %u that does not exist\n", 
486                          hdr->destnode));
487                 return;
488         }
489
490         node = ctdb->nodes[hdr->destnode];
491
492         if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
493                 ctdb_defer_packet(ctdb, hdr);
494         } else {
495                 node->tx_cnt++;
496                 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
497                         ctdb_fatal(ctdb, "Unable to queue packet\n");
498                 }
499         }
500 }
501
502
503 static const struct ctdb_upcalls ctdb_upcalls = {
504         .recv_pkt       = ctdb_recv_pkt,
505         .node_dead      = ctdb_node_dead,
506         .node_connected = ctdb_node_connected
507 };
508
509 /*
510   initialise the ctdb daemon. 
511
512   NOTE: In current code the daemon does not fork. This is for testing purposes only
513   and to simplify the code.
514 */
515 struct ctdb_context *ctdb_init(struct event_context *ev)
516 {
517         struct ctdb_context *ctdb;
518
519         ctdb = talloc_zero(ev, struct ctdb_context);
520         ctdb->ev               = ev;
521         ctdb->recovery_mode    = CTDB_RECOVERY_NORMAL;
522         ctdb->recovery_master  = (uint32_t)-1;
523         ctdb->upcalls          = &ctdb_upcalls;
524         ctdb->idr              = idr_init(ctdb);
525         ctdb->recovery_lock_fd = -1;
526         ctdb->monitoring_mode  = CTDB_MONITORING_ACTIVE;
527
528         ctdb_tunables_set_defaults(ctdb);
529
530         return ctdb;
531 }
532