tools/ctdb: display INACTIVE status in "ctdb status" and "ctdb status -Y"
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48
49 #ifdef CTDB_VERS
50 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
51 {
52 #define STR(x) #x
53 #define XSTR(x) STR(x)
54         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
55         return 0;
56 }
57 #endif
58
59
60 /*
61   verify that a node exists and is reachable
62  */
63 static void verify_node(struct ctdb_context *ctdb)
64 {
65         int ret;
66         struct ctdb_node_map *nodemap=NULL;
67
68         if (options.pnn == CTDB_CURRENT_NODE) {
69                 return;
70         }
71         if (options.pnn == CTDB_BROADCAST_ALL) {
72                 return;
73         }
74
75         /* verify the node exists */
76         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
77                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
78                 exit(10);
79         }
80         if (options.pnn >= nodemap->num) {
81                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
82                 exit(ERR_NONODE);
83         }
84         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
85                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
86                 exit(ERR_DISNODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92
93         /* verify we can access the node */
94         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
95         if (ret == -1) {
96                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
97                 exit(10);
98         }
99 }
100
101 /*
102  check if a database exists
103 */
104 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
105 {
106         int i, ret;
107         struct ctdb_dbid_map *dbmap=NULL;
108
109         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
110         if (ret != 0) {
111                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
112                 return -1;
113         }
114
115         for(i=0;i<dbmap->num;i++){
116                 const char *name;
117
118                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
119                 if (!strcmp(name, db_name)) {
120                         return 0;
121                 }
122         }
123
124         return -1;
125 }
126
127 /*
128   see if a process exists
129  */
130 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
131 {
132         uint32_t pnn, pid;
133         int ret;
134         if (argc < 1) {
135                 usage();
136         }
137
138         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
139                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
140                 return -1;
141         }
142
143         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
144         if (ret == 0) {
145                 printf("%u:%u exists\n", pnn, pid);
146         } else {
147                 printf("%u:%u does not exist\n", pnn, pid);
148         }
149         return ret;
150 }
151
152 /*
153   display statistics structure
154  */
155 static void show_statistics(struct ctdb_statistics *s)
156 {
157         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
158         int i;
159         const char *prefix=NULL;
160         int preflen=0;
161         const struct {
162                 const char *name;
163                 uint32_t offset;
164         } fields[] = {
165 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
166                 STATISTICS_FIELD(num_clients),
167                 STATISTICS_FIELD(frozen),
168                 STATISTICS_FIELD(recovering),
169                 STATISTICS_FIELD(client_packets_sent),
170                 STATISTICS_FIELD(client_packets_recv),
171                 STATISTICS_FIELD(node_packets_sent),
172                 STATISTICS_FIELD(node_packets_recv),
173                 STATISTICS_FIELD(keepalive_packets_sent),
174                 STATISTICS_FIELD(keepalive_packets_recv),
175                 STATISTICS_FIELD(node.req_call),
176                 STATISTICS_FIELD(node.reply_call),
177                 STATISTICS_FIELD(node.req_dmaster),
178                 STATISTICS_FIELD(node.reply_dmaster),
179                 STATISTICS_FIELD(node.reply_error),
180                 STATISTICS_FIELD(node.req_message),
181                 STATISTICS_FIELD(node.req_control),
182                 STATISTICS_FIELD(node.reply_control),
183                 STATISTICS_FIELD(client.req_call),
184                 STATISTICS_FIELD(client.req_message),
185                 STATISTICS_FIELD(client.req_control),
186                 STATISTICS_FIELD(timeouts.call),
187                 STATISTICS_FIELD(timeouts.control),
188                 STATISTICS_FIELD(timeouts.traverse),
189                 STATISTICS_FIELD(total_calls),
190                 STATISTICS_FIELD(pending_calls),
191                 STATISTICS_FIELD(lockwait_calls),
192                 STATISTICS_FIELD(pending_lockwait_calls),
193                 STATISTICS_FIELD(childwrite_calls),
194                 STATISTICS_FIELD(pending_childwrite_calls),
195                 STATISTICS_FIELD(memory_used),
196                 STATISTICS_FIELD(max_hop_count),
197         };
198         printf("CTDB version %u\n", CTDB_VERSION);
199         for (i=0;i<ARRAY_SIZE(fields);i++) {
200                 if (strchr(fields[i].name, '.')) {
201                         preflen = strcspn(fields[i].name, ".")+1;
202                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
203                                 prefix = fields[i].name;
204                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
205                         }
206                 } else {
207                         preflen = 0;
208                 }
209                 printf(" %*s%-22s%*s%10u\n", 
210                        preflen?4:0, "",
211                        fields[i].name+preflen, 
212                        preflen?0:4, "",
213                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
214         }
215         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
216         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
217
218         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
219         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
220         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
221         talloc_free(tmp_ctx);
222 }
223
224 /*
225   display remote ctdb statistics combined from all nodes
226  */
227 static int control_statistics_all(struct ctdb_context *ctdb)
228 {
229         int ret, i;
230         struct ctdb_statistics statistics;
231         uint32_t *nodes;
232         uint32_t num_nodes;
233
234         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
235         CTDB_NO_MEMORY(ctdb, nodes);
236         
237         ZERO_STRUCT(statistics);
238
239         for (i=0;i<num_nodes;i++) {
240                 struct ctdb_statistics s1;
241                 int j;
242                 uint32_t *v1 = (uint32_t *)&s1;
243                 uint32_t *v2 = (uint32_t *)&statistics;
244                 uint32_t num_ints = 
245                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
246                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
247                 if (ret != 0) {
248                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
249                         return ret;
250                 }
251                 for (j=0;j<num_ints;j++) {
252                         v2[j] += v1[j];
253                 }
254                 statistics.max_hop_count = 
255                         MAX(statistics.max_hop_count, s1.max_hop_count);
256                 statistics.max_call_latency = 
257                         MAX(statistics.max_call_latency, s1.max_call_latency);
258                 statistics.max_lockwait_latency = 
259                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
260         }
261         talloc_free(nodes);
262         printf("Gathered statistics for %u nodes\n", num_nodes);
263         show_statistics(&statistics);
264         return 0;
265 }
266
267 /*
268   display remote ctdb statistics
269  */
270 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
271 {
272         int ret;
273         struct ctdb_statistics statistics;
274
275         if (options.pnn == CTDB_BROADCAST_ALL) {
276                 return control_statistics_all(ctdb);
277         }
278
279         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
280         if (ret != 0) {
281                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
282                 return ret;
283         }
284         show_statistics(&statistics);
285         return 0;
286 }
287
288
289 /*
290   reset remote ctdb statistics
291  */
292 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
293 {
294         int ret;
295
296         ret = ctdb_statistics_reset(ctdb, options.pnn);
297         if (ret != 0) {
298                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
299                 return ret;
300         }
301         return 0;
302 }
303
304
305 /*
306   display uptime of remote node
307  */
308 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
309 {
310         int ret;
311         struct ctdb_uptime *uptime = NULL;
312         int tmp, days, hours, minutes, seconds;
313
314         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
315         if (ret != 0) {
316                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
317                 return ret;
318         }
319
320         if (options.machinereadable){
321                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
322                 printf(":%u:%u:%u:%lf\n",
323                         (unsigned int)uptime->current_time.tv_sec,
324                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
325                         (unsigned int)uptime->last_recovery_finished.tv_sec,
326                         timeval_delta(&uptime->last_recovery_finished,
327                                       &uptime->last_recovery_started)
328                 );
329                 return 0;
330         }
331
332         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
333
334         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
335         seconds = tmp%60;
336         tmp    /= 60;
337         minutes = tmp%60;
338         tmp    /= 60;
339         hours   = tmp%24;
340         tmp    /= 24;
341         days    = tmp;
342         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
343
344         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
345         seconds = tmp%60;
346         tmp    /= 60;
347         minutes = tmp%60;
348         tmp    /= 60;
349         hours   = tmp%24;
350         tmp    /= 24;
351         days    = tmp;
352         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
353         
354         printf("Duration of last recovery/failover: %lf seconds\n",
355                 timeval_delta(&uptime->last_recovery_finished,
356                               &uptime->last_recovery_started));
357
358         return 0;
359 }
360
361 /*
362   show the PNN of the current node
363  */
364 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
365 {
366         int mypnn;
367
368         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
369         if (mypnn == -1) {
370                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
371                 return -1;
372         }
373
374         printf("PNN:%d\n", mypnn);
375         return 0;
376 }
377
378
379 struct pnn_node {
380         struct pnn_node *next;
381         const char *addr;
382         int pnn;
383 };
384
385 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
386 {
387         const char *nodes_list;
388         int nlines;
389         char **lines;
390         int i, pnn;
391         struct pnn_node *pnn_nodes = NULL;
392         struct pnn_node *pnn_node;
393         struct pnn_node *tmp_node;
394
395         /* read the nodes file */
396         nodes_list = getenv("CTDB_NODES");
397         if (nodes_list == NULL) {
398                 nodes_list = "/etc/ctdb/nodes";
399         }
400         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
401         if (lines == NULL) {
402                 return NULL;
403         }
404         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
405                 nlines--;
406         }
407         for (i=0, pnn=0; i<nlines; i++) {
408                 char *node;
409
410                 node = lines[i];
411                 /* strip leading spaces */
412                 while((*node == ' ') || (*node == '\t')) {
413                         node++;
414                 }
415                 if (*node == '#') {
416                         pnn++;
417                         continue;
418                 }
419                 if (strcmp(node, "") == 0) {
420                         continue;
421                 }
422                 pnn_node = talloc(mem_ctx, struct pnn_node);
423                 pnn_node->pnn = pnn++;
424                 pnn_node->addr = talloc_strdup(pnn_node, node);
425                 pnn_node->next = pnn_nodes;
426                 pnn_nodes = pnn_node;
427         }
428
429         /* swap them around so we return them in incrementing order */
430         pnn_node = pnn_nodes;
431         pnn_nodes = NULL;
432         while (pnn_node) {
433                 tmp_node = pnn_node;
434                 pnn_node = pnn_node->next;
435
436                 tmp_node->next = pnn_nodes;
437                 pnn_nodes = tmp_node;
438         }
439
440         return pnn_nodes;
441 }
442
443 /*
444   show the PNN of the current node
445   discover the pnn by loading the nodes file and try to bind to all
446   addresses one at a time until the ip address is found.
447  */
448 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
449 {
450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
451         struct pnn_node *pnn_nodes;
452         struct pnn_node *pnn_node;
453
454         pnn_nodes = read_nodes_file(mem_ctx);
455         if (pnn_nodes == NULL) {
456                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
457                 talloc_free(mem_ctx);
458                 return -1;
459         }
460
461         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
462                 ctdb_sock_addr addr;
463
464                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
465                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
466                         talloc_free(mem_ctx);
467                         return -1;
468                 }
469
470                 if (ctdb_sys_have_ip(&addr)) {
471                         printf("PNN:%d\n", pnn_node->pnn);
472                         talloc_free(mem_ctx);
473                         return 0;
474                 }
475         }
476
477         printf("Failed to detect which PNN this node is\n");
478         talloc_free(mem_ctx);
479         return -1;
480 }
481
482 /*
483   display remote ctdb status
484  */
485 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
486 {
487         int i, ret;
488         struct ctdb_vnn_map *vnnmap=NULL;
489         struct ctdb_node_map *nodemap=NULL;
490         uint32_t recmode, recmaster;
491         int mypnn;
492
493         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
494         if (mypnn == -1) {
495                 return -1;
496         }
497
498         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
499         if (ret != 0) {
500                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
501                 return ret;
502         }
503
504         if(options.machinereadable){
505                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:\n");
506                 for(i=0;i<nodemap->num;i++){
507                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
508                                 continue;
509                         }
510                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
511                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
512                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
513                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
514                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
515                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
516                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
517                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE));
518                 }
519                 return 0;
520         }
521
522         printf("Number of nodes:%d\n", nodemap->num);
523         for(i=0;i<nodemap->num;i++){
524                 static const struct {
525                         uint32_t flag;
526                         const char *name;
527                 } flag_names[] = {
528                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
529                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
530                         { NODE_FLAGS_BANNED,                "BANNED" },
531                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
532                         { NODE_FLAGS_DELETED,               "DELETED" },
533                         { NODE_FLAGS_STOPPED,               "STOPPED" },
534                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
535                 };
536                 char *flags_str = NULL;
537                 int j;
538
539                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
540                         continue;
541                 }
542                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
543                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
544                                 if (flags_str == NULL) {
545                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
546                                 } else {
547                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
548                                                                            flag_names[j].name);
549                                 }
550                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
551                         }
552                 }
553                 if (flags_str == NULL) {
554                         flags_str = talloc_strdup(ctdb, "OK");
555                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
556                 }
557                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
558                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
559                        flags_str,
560                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
561                 talloc_free(flags_str);
562         }
563
564         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
565         if (ret != 0) {
566                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
567                 return ret;
568         }
569         if (vnnmap->generation == INVALID_GENERATION) {
570                 printf("Generation:INVALID\n");
571         } else {
572                 printf("Generation:%d\n",vnnmap->generation);
573         }
574         printf("Size:%d\n",vnnmap->size);
575         for(i=0;i<vnnmap->size;i++){
576                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
577         }
578
579         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
580         if (ret != 0) {
581                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
582                 return ret;
583         }
584         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
585
586         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
587         if (ret != 0) {
588                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
589                 return ret;
590         }
591         printf("Recovery master:%d\n",recmaster);
592
593         return 0;
594 }
595
596
597 struct natgw_node {
598         struct natgw_node *next;
599         const char *addr;
600 };
601
602 /*
603   display the list of nodes belonging to this natgw configuration
604  */
605 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
606 {
607         int i, ret;
608         const char *natgw_list;
609         int nlines;
610         char **lines;
611         struct natgw_node *natgw_nodes = NULL;
612         struct natgw_node *natgw_node;
613         struct ctdb_node_map *nodemap=NULL;
614
615
616         /* read the natgw nodes file into a linked list */
617         natgw_list = getenv("NATGW_NODES");
618         if (natgw_list == NULL) {
619                 natgw_list = "/etc/ctdb/natgw_nodes";
620         }
621         lines = file_lines_load(natgw_list, &nlines, ctdb);
622         if (lines == NULL) {
623                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
624                 return -1;
625         }
626         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
627                 nlines--;
628         }
629         for (i=0;i<nlines;i++) {
630                 char *node;
631
632                 node = lines[i];
633                 /* strip leading spaces */
634                 while((*node == ' ') || (*node == '\t')) {
635                         node++;
636                 }
637                 if (*node == '#') {
638                         continue;
639                 }
640                 if (strcmp(node, "") == 0) {
641                         continue;
642                 }
643                 natgw_node = talloc(ctdb, struct natgw_node);
644                 natgw_node->addr = talloc_strdup(natgw_node, node);
645                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
646                 natgw_node->next = natgw_nodes;
647                 natgw_nodes = natgw_node;
648         }
649
650         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
651         if (ret != 0) {
652                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
653                 return ret;
654         }
655
656         i=0;
657         while(i<nodemap->num) {
658                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
659                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
660                                 break;
661                         }
662                 }
663
664                 /* this node was not in the natgw so we just remove it from
665                  * the list
666                  */
667                 if ((natgw_node == NULL) 
668                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
669                         int j;
670
671                         for (j=i+1; j<nodemap->num; j++) {
672                                 nodemap->nodes[j-1] = nodemap->nodes[j];
673                         }
674                         nodemap->num--;
675                         continue;
676                 }
677
678                 i++;
679         }               
680
681         /* pick a node to be natgwmaster
682          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
683          */
684         for(i=0;i<nodemap->num;i++){
685                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
686                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
687                         break;
688                 }
689         }
690         /* we couldnt find any healthy node, try unhealthy ones */
691         if (i == nodemap->num) {
692                 for(i=0;i<nodemap->num;i++){
693                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
694                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
695                                 break;
696                         }
697                 }
698         }
699         /* unless all nodes are STOPPED, when we pick one anyway */
700         if (i == nodemap->num) {
701                 for(i=0;i<nodemap->num;i++){
702                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
703                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
704                                 break;
705                         }
706                 }
707                 /* or if we still can not find any */
708                 if (i == nodemap->num) {
709                         printf("-1 0.0.0.0\n");
710                 }
711         }
712
713         /* print the pruned list of nodes belonging to this natgw list */
714         for(i=0;i<nodemap->num;i++){
715                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
716                         continue;
717                 }
718                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
719                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
720                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
721                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
722                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
723                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
724                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
725         }
726
727         return 0;
728 }
729
730 /*
731   display the status of the scripts for monitoring (or other events)
732  */
733 static int control_one_scriptstatus(struct ctdb_context *ctdb,
734                                     enum ctdb_eventscript_call type)
735 {
736         struct ctdb_scripts_wire *script_status;
737         int ret, i;
738
739         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
740         if (ret != 0) {
741                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
742                 return ret;
743         }
744
745         if (script_status == NULL) {
746                 if (!options.machinereadable) {
747                         printf("%s cycle never run\n",
748                                ctdb_eventscript_call_names[type]);
749                 }
750                 return 0;
751         }
752
753         if (!options.machinereadable) {
754                 printf("%d scripts were executed last %s cycle\n",
755                        script_status->num_scripts,
756                        ctdb_eventscript_call_names[type]);
757         }
758         for (i=0; i<script_status->num_scripts; i++) {
759                 const char *status = NULL;
760
761                 switch (script_status->scripts[i].status) {
762                 case -ETIME:
763                         status = "TIMEDOUT";
764                         break;
765                 case -ENOEXEC:
766                         status = "DISABLED";
767                         break;
768                 case 0:
769                         status = "OK";
770                         break;
771                 default:
772                         if (script_status->scripts[i].status > 0)
773                                 status = "ERROR";
774                         break;
775                 }
776                 if (options.machinereadable) {
777                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
778                                ctdb_eventscript_call_names[type],
779                                script_status->scripts[i].name,
780                                script_status->scripts[i].status,
781                                status,
782                                (long)script_status->scripts[i].start.tv_sec,
783                                (long)script_status->scripts[i].start.tv_usec,
784                                (long)script_status->scripts[i].finished.tv_sec,
785                                (long)script_status->scripts[i].finished.tv_usec,
786                                script_status->scripts[i].output);
787                         continue;
788                 }
789                 if (status)
790                         printf("%-20s Status:%s    ",
791                                script_status->scripts[i].name, status);
792                 else
793                         /* Some other error, eg from stat. */
794                         printf("%-20s Status:CANNOT RUN (%s)",
795                                script_status->scripts[i].name,
796                                strerror(-script_status->scripts[i].status));
797
798                 if (script_status->scripts[i].status >= 0) {
799                         printf("Duration:%.3lf ",
800                         timeval_delta(&script_status->scripts[i].finished,
801                               &script_status->scripts[i].start));
802                 }
803                 if (script_status->scripts[i].status != -ENOEXEC) {
804                         printf("%s",
805                                ctime(&script_status->scripts[i].start.tv_sec));
806                         if (script_status->scripts[i].status != 0) {
807                                 printf("   OUTPUT:%s\n",
808                                        script_status->scripts[i].output);
809                         }
810                 } else {
811                         printf("\n");
812                 }
813         }
814         return 0;
815 }
816
817
818 static int control_scriptstatus(struct ctdb_context *ctdb,
819                                 int argc, const char **argv)
820 {
821         int ret;
822         enum ctdb_eventscript_call type, min, max;
823         const char *arg;
824
825         if (argc > 1) {
826                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
827                 return -1;
828         }
829
830         if (argc == 0)
831                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
832         else
833                 arg = argv[0];
834
835         for (type = 0; type < CTDB_EVENT_MAX; type++) {
836                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
837                         min = type;
838                         max = type+1;
839                         break;
840                 }
841         }
842         if (type == CTDB_EVENT_MAX) {
843                 if (strcmp(arg, "all") == 0) {
844                         min = 0;
845                         max = CTDB_EVENT_MAX;
846                 } else {
847                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
848                         return -1;
849                 }
850         }
851
852         if (options.machinereadable) {
853                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
854         }
855
856         for (type = min; type < max; type++) {
857                 ret = control_one_scriptstatus(ctdb, type);
858                 if (ret != 0) {
859                         return ret;
860                 }
861         }
862
863         return 0;
864 }
865
866 /*
867   enable an eventscript
868  */
869 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
870 {
871         int ret;
872
873         if (argc < 1) {
874                 usage();
875         }
876
877         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
878         if (ret != 0) {
879           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
880                 return ret;
881         }
882
883         return 0;
884 }
885
886 /*
887   disable an eventscript
888  */
889 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
890 {
891         int ret;
892
893         if (argc < 1) {
894                 usage();
895         }
896
897         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
898         if (ret != 0) {
899           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
900                 return ret;
901         }
902
903         return 0;
904 }
905
906 /*
907   display the pnn of the recovery master
908  */
909 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
910 {
911         int ret;
912         uint32_t recmaster;
913
914         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
915         if (ret != 0) {
916                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
917                 return ret;
918         }
919         printf("%d\n",recmaster);
920
921         return 0;
922 }
923
924 /*
925   get a list of all tickles for this pnn
926  */
927 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
928 {
929         struct ctdb_control_tcp_tickle_list *list;
930         ctdb_sock_addr addr;
931         int i, ret;
932
933         if (argc < 1) {
934                 usage();
935         }
936
937         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
938                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
939                 return -1;
940         }
941
942         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
943         if (ret == -1) {
944                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
945                 return -1;
946         }
947
948         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
949         printf("Num tickles:%u\n", list->tickles.num);
950         for (i=0;i<list->tickles.num;i++) {
951                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
952                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
953         }
954
955         talloc_free(list);
956         
957         return 0;
958 }
959
960
961
962 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
963 {
964         struct ctdb_all_public_ips *ips;
965         struct ctdb_public_ip ip;
966         int i, ret;
967         uint32_t *nodes;
968         uint32_t disable_time;
969         TDB_DATA data;
970         struct ctdb_node_map *nodemap=NULL;
971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
972
973         disable_time = 30;
974         data.dptr  = (uint8_t*)&disable_time;
975         data.dsize = sizeof(disable_time);
976         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
977         if (ret != 0) {
978                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
979                 return -1;
980         }
981
982
983
984         /* read the public ip list from the node */
985         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
986         if (ret != 0) {
987                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
988                 talloc_free(tmp_ctx);
989                 return -1;
990         }
991
992         for (i=0;i<ips->num;i++) {
993                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
994                         break;
995                 }
996         }
997         if (i==ips->num) {
998                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
999                         pnn, ctdb_addr_to_str(addr)));
1000                 talloc_free(tmp_ctx);
1001                 return -1;
1002         }
1003
1004         ip.pnn  = pnn;
1005         ip.addr = *addr;
1006
1007         data.dptr  = (uint8_t *)&ip;
1008         data.dsize = sizeof(ip);
1009
1010         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1011         if (ret != 0) {
1012                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1013                 talloc_free(tmp_ctx);
1014                 return ret;
1015         }
1016
1017         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1018         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1019                                         nodes, 0,
1020                                         TIMELIMIT(),
1021                                         false, data,
1022                                         NULL, NULL,
1023                                         NULL);
1024         if (ret != 0) {
1025                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1026                 talloc_free(tmp_ctx);
1027                 return -1;
1028         }
1029
1030         ret = ctdb_ctrl_takeover_ip(ctdb, TIMELIMIT(), pnn, &ip);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036
1037         talloc_free(tmp_ctx);
1038         return 0;
1039 }
1040
1041 /*
1042   move/failover an ip address to a specific node
1043  */
1044 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1045 {
1046         uint32_t pnn;
1047         ctdb_sock_addr addr;
1048
1049         if (argc < 2) {
1050                 usage();
1051                 return -1;
1052         }
1053
1054         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1055                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1056                 return -1;
1057         }
1058
1059
1060         if (sscanf(argv[1], "%u", &pnn) != 1) {
1061                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1062                 return -1;
1063         }
1064
1065         if (move_ip(ctdb, &addr, pnn) != 0) {
1066                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1067                 return -1;
1068         }
1069
1070         return 0;
1071 }
1072
1073 void getips_store_callback(void *param, void *data)
1074 {
1075         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1076         struct ctdb_all_public_ips *ips = param;
1077         int i;
1078
1079         i = ips->num++;
1080         ips->ips[i].pnn  = node_ip->pnn;
1081         ips->ips[i].addr = node_ip->addr;
1082 }
1083
1084 void getips_count_callback(void *param, void *data)
1085 {
1086         uint32_t *count = param;
1087
1088         (*count)++;
1089 }
1090
1091 #define IP_KEYLEN       4
1092 static uint32_t *ip_key(ctdb_sock_addr *ip)
1093 {
1094         static uint32_t key[IP_KEYLEN];
1095
1096         bzero(key, sizeof(key));
1097
1098         switch (ip->sa.sa_family) {
1099         case AF_INET:
1100                 key[0]  = ip->ip.sin_addr.s_addr;
1101                 break;
1102         case AF_INET6:
1103                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1104                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1105                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1106                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1107                 break;
1108         default:
1109                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1110                 return key;
1111         }
1112
1113         return key;
1114 }
1115
1116 static void *add_ip_callback(void *parm, void *data)
1117 {
1118         return parm;
1119 }
1120
1121 static int
1122 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1123 {
1124         struct ctdb_all_public_ips *tmp_ips;
1125         struct ctdb_node_map *nodemap=NULL;
1126         trbt_tree_t *ip_tree;
1127         int i, j, len, ret;
1128         uint32_t count;
1129
1130         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1131         if (ret != 0) {
1132                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1133                 return ret;
1134         }
1135
1136         ip_tree = trbt_create(tmp_ctx, 0);
1137
1138         for(i=0;i<nodemap->num;i++){
1139                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1140                         continue;
1141                 }
1142                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1143                         continue;
1144                 }
1145
1146                 /* read the public ip list from this node */
1147                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1148                 if (ret != 0) {
1149                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1150                         return -1;
1151                 }
1152         
1153                 for (j=0; j<tmp_ips->num;j++) {
1154                         struct ctdb_public_ip *node_ip;
1155
1156                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1157                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1158                         node_ip->addr = tmp_ips->ips[j].addr;
1159
1160                         trbt_insertarray32_callback(ip_tree,
1161                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1162                                 add_ip_callback,
1163                                 node_ip);
1164                 }
1165                 talloc_free(tmp_ips);
1166         }
1167
1168         /* traverse */
1169         count = 0;
1170         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1171
1172         len = offsetof(struct ctdb_all_public_ips, ips) + 
1173                 count*sizeof(struct ctdb_public_ip);
1174         tmp_ips = talloc_zero_size(tmp_ctx, len);
1175         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1176
1177         *ips = tmp_ips;
1178
1179         return 0;
1180 }
1181
1182
1183 /* 
1184  * scans all other nodes and returns a pnn for another node that can host this 
1185  * ip address or -1
1186  */
1187 static int
1188 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1189 {
1190         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1191         struct ctdb_all_public_ips *ips;
1192         struct ctdb_node_map *nodemap=NULL;
1193         int i, j, ret;
1194
1195         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1196         if (ret != 0) {
1197                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1198                 talloc_free(tmp_ctx);
1199                 return ret;
1200         }
1201
1202         for(i=0;i<nodemap->num;i++){
1203                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1204                         continue;
1205                 }
1206                 if (nodemap->nodes[i].pnn == options.pnn) {
1207                         continue;
1208                 }
1209
1210                 /* read the public ip list from this node */
1211                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1212                 if (ret != 0) {
1213                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1214                         return -1;
1215                 }
1216
1217                 for (j=0;j<ips->num;j++) {
1218                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1219                                 talloc_free(tmp_ctx);
1220                                 return nodemap->nodes[i].pnn;
1221                         }
1222                 }
1223                 talloc_free(ips);
1224         }
1225
1226         talloc_free(tmp_ctx);
1227         return -1;
1228 }
1229
1230 /*
1231   add a public ip address to a node
1232  */
1233 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1234 {
1235         int i, ret;
1236         int len;
1237         uint32_t pnn;
1238         unsigned mask;
1239         ctdb_sock_addr addr;
1240         struct ctdb_control_ip_iface *pub;
1241         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1242         struct ctdb_all_public_ips *ips;
1243
1244
1245         if (argc != 2) {
1246                 talloc_free(tmp_ctx);
1247                 usage();
1248         }
1249
1250         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1251                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1252                 talloc_free(tmp_ctx);
1253                 return -1;
1254         }
1255
1256         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1257         if (ret != 0) {
1258                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1259                 talloc_free(tmp_ctx);
1260                 return ret;
1261         }
1262
1263
1264         /* check if some other node is already serving this ip, if not,
1265          * we will claim it
1266          */
1267         for (i=0;i<ips->num;i++) {
1268                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1269                         break;
1270                 }
1271         }
1272
1273         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1274         pub = talloc_size(tmp_ctx, len); 
1275         CTDB_NO_MEMORY(ctdb, pub);
1276
1277         pub->addr  = addr;
1278         pub->mask  = mask;
1279         pub->len   = strlen(argv[1])+1;
1280         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1281
1282         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1285                 talloc_free(tmp_ctx);
1286                 return ret;
1287         }
1288
1289         if (i == ips->num) {
1290                 /* no one has this ip so we claim it */
1291                 pnn  = options.pnn;
1292         } else {
1293                 pnn  = ips->ips[i].pnn;
1294         }
1295
1296         if (move_ip(ctdb, &addr, pnn) != 0) {
1297                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1298                 return -1;
1299         }
1300
1301         talloc_free(tmp_ctx);
1302         return 0;
1303 }
1304
1305 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1306
1307 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1308 {
1309         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1310         struct ctdb_node_map *nodemap=NULL;
1311         struct ctdb_all_public_ips *ips;
1312         int ret, i, j;
1313
1314         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1317                 return ret;
1318         }
1319
1320         /* remove it from the nodes that are not hosting the ip currently */
1321         for(i=0;i<nodemap->num;i++){
1322                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1323                         continue;
1324                 }
1325                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1326                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1327                         continue;
1328                 }
1329
1330                 for (j=0;j<ips->num;j++) {
1331                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1332                                 break;
1333                         }
1334                 }
1335                 if (j==ips->num) {
1336                         continue;
1337                 }
1338
1339                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1340                         continue;
1341                 }
1342
1343                 options.pnn = nodemap->nodes[i].pnn;
1344                 control_delip(ctdb, argc, argv);
1345         }
1346
1347
1348         /* remove it from every node (also the one hosting it) */
1349         for(i=0;i<nodemap->num;i++){
1350                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1351                         continue;
1352                 }
1353                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1354                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1355                         continue;
1356                 }
1357
1358                 for (j=0;j<ips->num;j++) {
1359                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1360                                 break;
1361                         }
1362                 }
1363                 if (j==ips->num) {
1364                         continue;
1365                 }
1366
1367                 options.pnn = nodemap->nodes[i].pnn;
1368                 control_delip(ctdb, argc, argv);
1369         }
1370
1371         talloc_free(tmp_ctx);
1372         return 0;
1373 }
1374         
1375 /*
1376   delete a public ip address from a node
1377  */
1378 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1379 {
1380         int i, ret;
1381         ctdb_sock_addr addr;
1382         struct ctdb_control_ip_iface pub;
1383         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1384         struct ctdb_all_public_ips *ips;
1385
1386         if (argc != 1) {
1387                 talloc_free(tmp_ctx);
1388                 usage();
1389         }
1390
1391         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1392                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1393                 return -1;
1394         }
1395
1396         if (options.pnn == CTDB_BROADCAST_ALL) {
1397                 return control_delip_all(ctdb, argc, argv, &addr);
1398         }
1399
1400         pub.addr  = addr;
1401         pub.mask  = 0;
1402         pub.len   = 0;
1403
1404         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1407                 talloc_free(tmp_ctx);
1408                 return ret;
1409         }
1410         
1411         for (i=0;i<ips->num;i++) {
1412                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1413                         break;
1414                 }
1415         }
1416
1417         if (i==ips->num) {
1418                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1419                         ctdb_addr_to_str(&addr)));
1420                 talloc_free(tmp_ctx);
1421                 return -1;
1422         }
1423
1424         if (ips->ips[i].pnn == options.pnn) {
1425                 ret = find_other_host_for_public_ip(ctdb, &addr);
1426                 if (ret != -1) {
1427                         if (move_ip(ctdb, &addr, ret) != 0) {
1428                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1429                                 return -1;
1430                         }
1431                 }
1432         }
1433
1434         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1437                 talloc_free(tmp_ctx);
1438                 return ret;
1439         }
1440
1441         talloc_free(tmp_ctx);
1442         return 0;
1443 }
1444
1445 /*
1446   kill a tcp connection
1447  */
1448 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1449 {
1450         int ret;
1451         struct ctdb_control_killtcp killtcp;
1452
1453         if (argc < 2) {
1454                 usage();
1455         }
1456
1457         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1458                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1459                 return -1;
1460         }
1461
1462         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1463                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1464                 return -1;
1465         }
1466
1467         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1468         if (ret != 0) {
1469                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1470                 return ret;
1471         }
1472
1473         return 0;
1474 }
1475
1476
1477 /*
1478   send a gratious arp
1479  */
1480 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1481 {
1482         int ret;
1483         ctdb_sock_addr addr;
1484
1485         if (argc < 2) {
1486                 usage();
1487         }
1488
1489         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1490                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1491                 return -1;
1492         }
1493
1494         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1495         if (ret != 0) {
1496                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1497                 return ret;
1498         }
1499
1500         return 0;
1501 }
1502
1503 /*
1504   register a server id
1505  */
1506 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1507 {
1508         int ret;
1509         struct ctdb_server_id server_id;
1510
1511         if (argc < 3) {
1512                 usage();
1513         }
1514
1515         server_id.pnn       = strtoul(argv[0], NULL, 0);
1516         server_id.type      = strtoul(argv[1], NULL, 0);
1517         server_id.server_id = strtoul(argv[2], NULL, 0);
1518
1519         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1520         if (ret != 0) {
1521                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1522                 return ret;
1523         }
1524         return -1;
1525 }
1526
1527 /*
1528   unregister a server id
1529  */
1530 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1531 {
1532         int ret;
1533         struct ctdb_server_id server_id;
1534
1535         if (argc < 3) {
1536                 usage();
1537         }
1538
1539         server_id.pnn       = strtoul(argv[0], NULL, 0);
1540         server_id.type      = strtoul(argv[1], NULL, 0);
1541         server_id.server_id = strtoul(argv[2], NULL, 0);
1542
1543         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1544         if (ret != 0) {
1545                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1546                 return ret;
1547         }
1548         return -1;
1549 }
1550
1551 /*
1552   check if a server id exists
1553  */
1554 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1555 {
1556         uint32_t status;
1557         int ret;
1558         struct ctdb_server_id server_id;
1559
1560         if (argc < 3) {
1561                 usage();
1562         }
1563
1564         server_id.pnn       = strtoul(argv[0], NULL, 0);
1565         server_id.type      = strtoul(argv[1], NULL, 0);
1566         server_id.server_id = strtoul(argv[2], NULL, 0);
1567
1568         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1569         if (ret != 0) {
1570                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1571                 return ret;
1572         }
1573
1574         if (status) {
1575                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1576         } else {
1577                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1578         }
1579         return 0;
1580 }
1581
1582 /*
1583   get a list of all server ids that are registered on a node
1584  */
1585 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1586 {
1587         int i, ret;
1588         struct ctdb_server_id_list *server_ids;
1589
1590         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1593                 return ret;
1594         }
1595
1596         for (i=0; i<server_ids->num; i++) {
1597                 printf("Server id %d:%d:%d\n", 
1598                         server_ids->server_ids[i].pnn, 
1599                         server_ids->server_ids[i].type, 
1600                         server_ids->server_ids[i].server_id); 
1601         }
1602
1603         return -1;
1604 }
1605
1606 /*
1607   send a tcp tickle ack
1608  */
1609 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1610 {
1611         int ret;
1612         ctdb_sock_addr  src, dst;
1613
1614         if (argc < 2) {
1615                 usage();
1616         }
1617
1618         if (!parse_ip_port(argv[0], &src)) {
1619                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1620                 return -1;
1621         }
1622
1623         if (!parse_ip_port(argv[1], &dst)) {
1624                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1625                 return -1;
1626         }
1627
1628         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1629         if (ret==0) {
1630                 return 0;
1631         }
1632         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1633
1634         return -1;
1635 }
1636
1637
1638 /*
1639   display public ip status
1640  */
1641 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1642 {
1643         int i, ret;
1644         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1645         struct ctdb_all_public_ips *ips;
1646
1647         if (options.pnn == CTDB_BROADCAST_ALL) {
1648                 /* read the list of public ips from all nodes */
1649                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1650         } else {
1651                 /* read the public ip list from this node */
1652                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1653         }
1654         if (ret != 0) {
1655                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1656                 talloc_free(tmp_ctx);
1657                 return ret;
1658         }
1659
1660         if (options.machinereadable){
1661                 printf(":Public IP:Node:\n");
1662         } else {
1663                 if (options.pnn == CTDB_BROADCAST_ALL) {
1664                         printf("Public IPs on ALL nodes\n");
1665                 } else {
1666                         printf("Public IPs on node %u\n", options.pnn);
1667                 }
1668         }
1669
1670         for (i=1;i<=ips->num;i++) {
1671                 if (options.machinereadable){
1672                         printf(":%s:%d:\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1673                 } else {
1674                         printf("%s %d\n", ctdb_addr_to_str(&ips->ips[ips->num-i].addr), ips->ips[ips->num-i].pnn);
1675                 }
1676         }
1677
1678         talloc_free(tmp_ctx);
1679         return 0;
1680 }
1681
1682 /*
1683   display pid of a ctdb daemon
1684  */
1685 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1686 {
1687         uint32_t pid;
1688         int ret;
1689
1690         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1691         if (ret != 0) {
1692                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1693                 return ret;
1694         }
1695         printf("Pid:%d\n", pid);
1696
1697         return 0;
1698 }
1699
1700 /*
1701   handler for receiving the response to ipreallocate
1702 */
1703 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1704                              TDB_DATA data, void *private_data)
1705 {
1706         exit(0);
1707 }
1708
1709 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1710 {
1711         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1712
1713         event_add_timed(ctdb->ev, ctdb, 
1714                                 timeval_current_ofs(1, 0),
1715                                 ctdb_every_second, ctdb);
1716 }
1717
1718 /*
1719   ask the recovery daemon on the recovery master to perform a ip reallocation
1720  */
1721 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1722 {
1723         int i, ret;
1724         TDB_DATA data;
1725         struct takeover_run_reply rd;
1726         uint32_t recmaster;
1727         struct ctdb_node_map *nodemap=NULL;
1728         int retries=0;
1729         struct timeval tv = timeval_current();
1730
1731         /* we need some events to trigger so we can timeout and restart
1732            the loop
1733         */
1734         event_add_timed(ctdb->ev, ctdb, 
1735                                 timeval_current_ofs(1, 0),
1736                                 ctdb_every_second, ctdb);
1737
1738         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1739         if (rd.pnn == -1) {
1740                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1741                 return -1;
1742         }
1743         rd.srvid = getpid();
1744
1745         /* register a message port for receiveing the reply so that we
1746            can receive the reply
1747         */
1748         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1749
1750         data.dptr = (uint8_t *)&rd;
1751         data.dsize = sizeof(rd);
1752
1753 again:
1754         if (retries>5) {
1755                 DEBUG(DEBUG_ERR,("Failed waiting for cluster convergense\n"));
1756                 exit(10);
1757         }
1758
1759         /* check that there are valid nodes available */
1760         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1761                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1762                 exit(10);
1763         }
1764         for (i=0; i<nodemap->num;i++) {
1765                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1766                         break;
1767                 }
1768         }
1769         if (i==nodemap->num) {
1770                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1771                 return 0;
1772         }
1773
1774
1775         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1776         if (ret != 0) {
1777                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1778                 return ret;
1779         }
1780
1781         /* verify the node exists */
1782         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1783                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1784                 exit(10);
1785         }
1786
1787
1788         /* check tha there are nodes available that can act as a recmaster */
1789         for (i=0; i<nodemap->num; i++) {
1790                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1791                         continue;
1792                 }
1793                 break;
1794         }
1795         if (i == nodemap->num) {
1796                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1797                 return 0;
1798         }
1799
1800         /* verify the recovery master is not STOPPED, nor BANNED */
1801         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1802                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1803                 retries++;
1804                 sleep(1);
1805                 goto again;
1806         } 
1807
1808         
1809         /* verify the recovery master is not STOPPED, nor BANNED */
1810         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1811                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1812                 retries++;
1813                 sleep(1);
1814                 goto again;
1815         } 
1816
1817         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1818         if (ret != 0) {
1819                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1820                 return -1;
1821         }
1822
1823         tv = timeval_current();
1824         /* this loop will terminate when we have received the reply */
1825         while (timeval_elapsed(&tv) < 3.0) {    
1826                 event_loop_once(ctdb->ev);
1827         }
1828
1829         DEBUG(DEBUG_INFO,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
1830         retries++;
1831         sleep(1);
1832         goto again;
1833
1834         return 0;
1835 }
1836
1837
1838 /*
1839   disable a remote node
1840  */
1841 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
1842 {
1843         int ret;
1844         struct ctdb_node_map *nodemap=NULL;
1845
1846         do {
1847                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
1848                 if (ret != 0) {
1849                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
1850                         return ret;
1851                 }
1852
1853                 sleep(1);
1854
1855                 /* read the nodemap and verify the change took effect */
1856                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1857                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1858                         exit(10);
1859                 }
1860
1861         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
1862         ret = control_ipreallocate(ctdb, argc, argv);
1863         if (ret != 0) {
1864                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1865                 return ret;
1866         }
1867
1868         return 0;
1869 }
1870
1871 /*
1872   enable a disabled remote node
1873  */
1874 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
1875 {
1876         int ret;
1877
1878         struct ctdb_node_map *nodemap=NULL;
1879
1880         do {
1881                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
1882                 if (ret != 0) {
1883                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
1884                         return ret;
1885                 }
1886
1887                 sleep(1);
1888
1889                 /* read the nodemap and verify the change took effect */
1890                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1891                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1892                         exit(10);
1893                 }
1894
1895         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
1896         ret = control_ipreallocate(ctdb, argc, argv);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1899                 return ret;
1900         }
1901
1902         return 0;
1903 }
1904
1905 /*
1906   stop a remote node
1907  */
1908 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
1909 {
1910         int ret;
1911         struct ctdb_node_map *nodemap=NULL;
1912
1913         do {
1914                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
1915                 if (ret != 0) {
1916                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
1917                 }
1918         
1919                 sleep(1);
1920
1921                 /* read the nodemap and verify the change took effect */
1922                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1923                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1924                         exit(10);
1925                 }
1926
1927         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
1928         ret = control_ipreallocate(ctdb, argc, argv);
1929         if (ret != 0) {
1930                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1931                 return ret;
1932         }
1933
1934         return 0;
1935 }
1936
1937 /*
1938   restart a stopped remote node
1939  */
1940 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
1941 {
1942         int ret;
1943
1944         struct ctdb_node_map *nodemap=NULL;
1945
1946         do {
1947                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
1948                 if (ret != 0) {
1949                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
1950                         return ret;
1951                 }
1952         
1953                 sleep(1);
1954
1955                 /* read the nodemap and verify the change took effect */
1956                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
1957                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1958                         exit(10);
1959                 }
1960
1961         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
1962         ret = control_ipreallocate(ctdb, argc, argv);
1963         if (ret != 0) {
1964                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
1965                 return ret;
1966         }
1967
1968         return 0;
1969 }
1970
1971 static uint32_t get_generation(struct ctdb_context *ctdb)
1972 {
1973         struct ctdb_vnn_map *vnnmap=NULL;
1974         int ret;
1975
1976         /* wait until the recmaster is not in recovery mode */
1977         while (1) {
1978                 uint32_t recmode, recmaster;
1979                 
1980                 if (vnnmap != NULL) {
1981                         talloc_free(vnnmap);
1982                         vnnmap = NULL;
1983                 }
1984
1985                 /* get the recmaster */
1986                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
1987                 if (ret != 0) {
1988                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1989                         exit(10);
1990                 }
1991
1992                 /* get recovery mode */
1993                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
1994                 if (ret != 0) {
1995                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1996                         exit(10);
1997                 }
1998
1999                 /* get the current generation number */
2000                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2001                 if (ret != 0) {
2002                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2003                         exit(10);
2004                 }
2005
2006                 if ((recmode == CTDB_RECOVERY_NORMAL)
2007                 &&  (vnnmap->generation != 1)){
2008                         return vnnmap->generation;
2009                 }
2010                 sleep(1);
2011         }
2012 }
2013
2014 /*
2015   ban a node from the cluster
2016  */
2017 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2018 {
2019         int ret;
2020         struct ctdb_node_map *nodemap=NULL;
2021         struct ctdb_ban_time bantime;
2022
2023         if (argc < 1) {
2024                 usage();
2025         }
2026         
2027         /* verify the node exists */
2028         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2029         if (ret != 0) {
2030                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2031                 return ret;
2032         }
2033
2034         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2035                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2036                 return -1;
2037         }
2038
2039         bantime.pnn  = options.pnn;
2040         bantime.time = strtoul(argv[0], NULL, 0);
2041
2042         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2043         if (ret != 0) {
2044                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2045                 return -1;
2046         }       
2047
2048         ret = control_ipreallocate(ctdb, argc, argv);
2049         if (ret != 0) {
2050                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2051                 return ret;
2052         }
2053
2054         return 0;
2055 }
2056
2057
2058 /*
2059   unban a node from the cluster
2060  */
2061 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2062 {
2063         int ret;
2064         struct ctdb_node_map *nodemap=NULL;
2065         struct ctdb_ban_time bantime;
2066
2067         /* verify the node exists */
2068         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2069         if (ret != 0) {
2070                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2071                 return ret;
2072         }
2073
2074         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2075                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2076                 return -1;
2077         }
2078
2079         bantime.pnn  = options.pnn;
2080         bantime.time = 0;
2081
2082         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2083         if (ret != 0) {
2084                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2085                 return -1;
2086         }       
2087
2088         ret = control_ipreallocate(ctdb, argc, argv);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2091                 return ret;
2092         }
2093
2094         return 0;
2095 }
2096
2097
2098 /*
2099   show ban information for a node
2100  */
2101 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2102 {
2103         int ret;
2104         struct ctdb_node_map *nodemap=NULL;
2105         struct ctdb_ban_time *bantime;
2106
2107         /* verify the node exists */
2108         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2109         if (ret != 0) {
2110                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2111                 return ret;
2112         }
2113
2114         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2115         if (ret != 0) {
2116                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2117                 return -1;
2118         }       
2119
2120         if (bantime->time == 0) {
2121                 printf("Node %u is not banned\n", bantime->pnn);
2122         } else {
2123                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2124         }
2125
2126         return 0;
2127 }
2128
2129 /*
2130   shutdown a daemon
2131  */
2132 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2133 {
2134         int ret;
2135
2136         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2139                 return ret;
2140         }
2141
2142         return 0;
2143 }
2144
2145 /*
2146   trigger a recovery
2147  */
2148 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2149 {
2150         int ret;
2151         uint32_t generation, next_generation;
2152
2153         /* record the current generation number */
2154         generation = get_generation(ctdb);
2155
2156         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2159                 return ret;
2160         }
2161
2162         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2163         if (ret != 0) {
2164                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2165                 return ret;
2166         }
2167
2168         /* wait until we are in a new generation */
2169         while (1) {
2170                 next_generation = get_generation(ctdb);
2171                 if (next_generation != generation) {
2172                         return 0;
2173                 }
2174                 sleep(1);
2175         }
2176
2177         return 0;
2178 }
2179
2180
2181 /*
2182   display monitoring mode of a remote node
2183  */
2184 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2185 {
2186         uint32_t monmode;
2187         int ret;
2188
2189         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2190         if (ret != 0) {
2191                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2192                 return ret;
2193         }
2194         if (!options.machinereadable){
2195                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2196         } else {
2197                 printf(":mode:\n");
2198                 printf(":%d:\n",monmode);
2199         }
2200         return 0;
2201 }
2202
2203
2204 /*
2205   display capabilities of a remote node
2206  */
2207 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2208 {
2209         uint32_t capabilities;
2210         int ret;
2211
2212         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2213         if (ret != 0) {
2214                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2215                 return ret;
2216         }
2217         
2218         if (!options.machinereadable){
2219                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2220                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2221                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2222                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2223         } else {
2224                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2225                 printf(":%d:%d:%d:%d:\n",
2226                         !!(capabilities&CTDB_CAP_RECMASTER),
2227                         !!(capabilities&CTDB_CAP_LMASTER),
2228                         !!(capabilities&CTDB_CAP_LVS),
2229                         !!(capabilities&CTDB_CAP_NATGW));
2230         }
2231         return 0;
2232 }
2233
2234 /*
2235   display lvs configuration
2236  */
2237 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2238 {
2239         uint32_t *capabilities;
2240         struct ctdb_node_map *nodemap=NULL;
2241         int i, ret;
2242         int healthy_count = 0;
2243
2244         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2245         if (ret != 0) {
2246                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2247                 return ret;
2248         }
2249
2250         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2251         CTDB_NO_MEMORY(ctdb, capabilities);
2252         
2253         /* collect capabilities for all connected nodes */
2254         for (i=0; i<nodemap->num; i++) {
2255                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2256                         continue;
2257                 }
2258                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2259                         continue;
2260                 }
2261         
2262                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2263                 if (ret != 0) {
2264                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2265                         return ret;
2266                 }
2267
2268                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2269                         continue;
2270                 }
2271
2272                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2273                         healthy_count++;
2274                 }
2275         }
2276
2277         /* Print all LVS nodes */
2278         for (i=0; i<nodemap->num; i++) {
2279                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2280                         continue;
2281                 }
2282                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2283                         continue;
2284                 }
2285                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2286                         continue;
2287                 }
2288
2289                 if (healthy_count != 0) {
2290                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2291                                 continue;
2292                         }
2293                 }
2294
2295                 printf("%d:%s\n", i, 
2296                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2297         }
2298
2299         return 0;
2300 }
2301
2302 /*
2303   display who is the lvs master
2304  */
2305 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2306 {
2307         uint32_t *capabilities;
2308         struct ctdb_node_map *nodemap=NULL;
2309         int i, ret;
2310         int healthy_count = 0;
2311
2312         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2313         if (ret != 0) {
2314                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2315                 return ret;
2316         }
2317
2318         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2319         CTDB_NO_MEMORY(ctdb, capabilities);
2320         
2321         /* collect capabilities for all connected nodes */
2322         for (i=0; i<nodemap->num; i++) {
2323                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2324                         continue;
2325                 }
2326                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2327                         continue;
2328                 }
2329         
2330                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2331                 if (ret != 0) {
2332                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2333                         return ret;
2334                 }
2335
2336                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2337                         continue;
2338                 }
2339
2340                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2341                         healthy_count++;
2342                 }
2343         }
2344
2345         /* find and show the lvsmaster */
2346         for (i=0; i<nodemap->num; i++) {
2347                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2348                         continue;
2349                 }
2350                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2351                         continue;
2352                 }
2353                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2354                         continue;
2355                 }
2356
2357                 if (healthy_count != 0) {
2358                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2359                                 continue;
2360                         }
2361                 }
2362
2363                 if (options.machinereadable){
2364                         printf("%d\n", i);
2365                 } else {
2366                         printf("Node %d is LVS master\n", i);
2367                 }
2368                 return 0;
2369         }
2370
2371         printf("There is no LVS master\n");
2372         return -1;
2373 }
2374
2375 /*
2376   disable monitoring on a  node
2377  */
2378 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2379 {
2380         
2381         int ret;
2382
2383         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2384         if (ret != 0) {
2385                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2386                 return ret;
2387         }
2388         printf("Monitoring mode:%s\n","DISABLED");
2389
2390         return 0;
2391 }
2392
2393 /*
2394   enable monitoring on a  node
2395  */
2396 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2397 {
2398         
2399         int ret;
2400
2401         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2402         if (ret != 0) {
2403                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2404                 return ret;
2405         }
2406         printf("Monitoring mode:%s\n","ACTIVE");
2407
2408         return 0;
2409 }
2410
2411 /*
2412   display remote list of keys/data for a db
2413  */
2414 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2415 {
2416         const char *db_name;
2417         struct ctdb_db_context *ctdb_db;
2418         int ret;
2419
2420         if (argc < 1) {
2421                 usage();
2422         }
2423
2424         db_name = argv[0];
2425
2426
2427         if (db_exists(ctdb, db_name)) {
2428                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2429                 return -1;
2430         }
2431
2432         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2433
2434         if (ctdb_db == NULL) {
2435                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2436                 return -1;
2437         }
2438
2439         /* traverse and dump the cluster tdb */
2440         ret = ctdb_dump_db(ctdb_db, stdout);
2441         if (ret == -1) {
2442                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2443                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2444                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2445                                   db_name));
2446                 return -1;
2447         }
2448         talloc_free(ctdb_db);
2449
2450         printf("Dumped %d records\n", ret);
2451         return 0;
2452 }
2453
2454
2455 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2456                              TDB_DATA data, void *private_data)
2457 {
2458         DEBUG(DEBUG_ERR,("Log data received\n"));
2459         if (data.dsize > 0) {
2460                 printf("%s", data.dptr);
2461         }
2462
2463         exit(0);
2464 }
2465
2466 /*
2467   display a list of log messages from the in memory ringbuffer
2468  */
2469 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2470 {
2471         int ret;
2472         int32_t res;
2473         struct ctdb_get_log_addr log_addr;
2474         TDB_DATA data;
2475         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2476         char *errmsg;
2477         struct timeval tv;
2478
2479         if (argc != 1) {
2480                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2481                 talloc_free(tmp_ctx);
2482                 return -1;
2483         }
2484
2485         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2486         log_addr.srvid = getpid();
2487         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2488                 log_addr.level = get_debug_by_desc(argv[0]);
2489         } else {
2490                 log_addr.level = strtol(argv[0], NULL, 0);
2491         }
2492
2493
2494         data.dptr = (unsigned char *)&log_addr;
2495         data.dsize = sizeof(log_addr);
2496
2497         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2498
2499         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2500         sleep(1);
2501
2502         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2503
2504         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2505                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2506         if (ret != 0 || res != 0) {
2507                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2508                 talloc_free(tmp_ctx);
2509                 return -1;
2510         }
2511
2512
2513         tv = timeval_current();
2514         /* this loop will terminate when we have received the reply */
2515         while (timeval_elapsed(&tv) < 3.0) {    
2516                 event_loop_once(ctdb->ev);
2517         }
2518
2519         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2520
2521         talloc_free(tmp_ctx);
2522         return 0;
2523 }
2524
2525 /*
2526   clear the in memory log area
2527  */
2528 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2529 {
2530         int ret;
2531         int32_t res;
2532         char *errmsg;
2533         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2534
2535         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2536                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2537         if (ret != 0 || res != 0) {
2538                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2539                 talloc_free(tmp_ctx);
2540                 return -1;
2541         }
2542
2543         talloc_free(tmp_ctx);
2544         return 0;
2545 }
2546
2547
2548
2549 /*
2550   display a list of the databases on a remote ctdb
2551  */
2552 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2553 {
2554         int i, ret;
2555         struct ctdb_dbid_map *dbmap=NULL;
2556
2557         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2558         if (ret != 0) {
2559                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2560                 return ret;
2561         }
2562
2563         if(options.machinereadable){
2564                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2565                 for(i=0;i<dbmap->num;i++){
2566                         const char *path;
2567                         const char *name;
2568                         const char *health;
2569                         bool persistent;
2570
2571                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2572                                             dbmap->dbs[i].dbid, ctdb, &path);
2573                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2574                                             dbmap->dbs[i].dbid, ctdb, &name);
2575                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2576                                               dbmap->dbs[i].dbid, ctdb, &health);
2577                         persistent = dbmap->dbs[i].persistent;
2578                         printf(":0x%08X:%s:%s:%d:%d:\n",
2579                                dbmap->dbs[i].dbid, name, path,
2580                                !!(persistent), !!(health));
2581                 }
2582                 return 0;
2583         }
2584
2585         printf("Number of databases:%d\n", dbmap->num);
2586         for(i=0;i<dbmap->num;i++){
2587                 const char *path;
2588                 const char *name;
2589                 const char *health;
2590                 bool persistent;
2591
2592                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2593                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2594                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2595                 persistent = dbmap->dbs[i].persistent;
2596                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2597                        dbmap->dbs[i].dbid, name, path,
2598                        persistent?" PERSISTENT":"",
2599                        health?" UNHEALTHY":"");
2600         }
2601
2602         return 0;
2603 }
2604
2605 /*
2606   display the status of a database on a remote ctdb
2607  */
2608 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2609 {
2610         int i, ret;
2611         struct ctdb_dbid_map *dbmap=NULL;
2612         const char *db_name;
2613
2614         if (argc < 1) {
2615                 usage();
2616         }
2617
2618         db_name = argv[0];
2619
2620         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2621         if (ret != 0) {
2622                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2623                 return ret;
2624         }
2625
2626         for(i=0;i<dbmap->num;i++){
2627                 const char *path;
2628                 const char *name;
2629                 const char *health;
2630                 bool persistent;
2631
2632                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2633                 if (strcmp(name, db_name) != 0) {
2634                         continue;
2635                 }
2636
2637                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2638                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2639                 persistent = dbmap->dbs[i].persistent;
2640                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2641                        dbmap->dbs[i].dbid, name, path,
2642                        persistent?"yes":"no",
2643                        health?health:"OK");
2644                 return 0;
2645         }
2646
2647         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2648         return 0;
2649 }
2650
2651 /*
2652   check if the local node is recmaster or not
2653   it will return 1 if this node is the recmaster and 0 if it is not
2654   or if the local ctdb daemon could not be contacted
2655  */
2656 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2657 {
2658         uint32_t mypnn, recmaster;
2659         int ret;
2660
2661         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2662         if (mypnn == -1) {
2663                 printf("Failed to get pnn of node\n");
2664                 return 1;
2665         }
2666
2667         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2668         if (ret != 0) {
2669                 printf("Failed to get the recmaster\n");
2670                 return 1;
2671         }
2672
2673         if (recmaster != mypnn) {
2674                 printf("this node is not the recmaster\n");
2675                 return 1;
2676         }
2677
2678         printf("this node is the recmaster\n");
2679         return 0;
2680 }
2681
2682 /*
2683   ping a node
2684  */
2685 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2686 {
2687         int ret;
2688         struct timeval tv = timeval_current();
2689         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2690         if (ret == -1) {
2691                 printf("Unable to get ping response from node %u\n", options.pnn);
2692                 return -1;
2693         } else {
2694                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2695                        options.pnn, timeval_elapsed(&tv), ret);
2696         }
2697         return 0;
2698 }
2699
2700
2701 /*
2702   get a tunable
2703  */
2704 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2705 {
2706         const char *name;
2707         uint32_t value;
2708         int ret;
2709
2710         if (argc < 1) {
2711                 usage();
2712         }
2713
2714         name = argv[0];
2715         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2716         if (ret == -1) {
2717                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2718                 return -1;
2719         }
2720
2721         printf("%-19s = %u\n", name, value);
2722         return 0;
2723 }
2724
2725 /*
2726   set a tunable
2727  */
2728 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
2729 {
2730         const char *name;
2731         uint32_t value;
2732         int ret;
2733
2734         if (argc < 2) {
2735                 usage();
2736         }
2737
2738         name = argv[0];
2739         value = strtoul(argv[1], NULL, 0);
2740
2741         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
2742         if (ret == -1) {
2743                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
2744                 return -1;
2745         }
2746         return 0;
2747 }
2748
2749 /*
2750   list all tunables
2751  */
2752 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
2753 {
2754         uint32_t count;
2755         const char **list;
2756         int ret, i;
2757
2758         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
2759         if (ret == -1) {
2760                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
2761                 return -1;
2762         }
2763
2764         for (i=0;i<count;i++) {
2765                 control_getvar(ctdb, 1, &list[i]);
2766         }
2767
2768         talloc_free(list);
2769         
2770         return 0;
2771 }
2772
2773 /*
2774   display debug level on a node
2775  */
2776 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2777 {
2778         int ret;
2779         int32_t level;
2780
2781         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
2782         if (ret != 0) {
2783                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
2784                 return ret;
2785         } else {
2786                 if (options.machinereadable){
2787                         printf(":Name:Level:\n");
2788                         printf(":%s:%d:\n",get_debug_by_level(level),level);
2789                 } else {
2790                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
2791                 }
2792         }
2793         return 0;
2794 }
2795
2796 /*
2797   display reclock file of a node
2798  */
2799 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2800 {
2801         int ret;
2802         const char *reclock;
2803
2804         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
2805         if (ret != 0) {
2806                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2807                 return ret;
2808         } else {
2809                 if (options.machinereadable){
2810                         if (reclock != NULL) {
2811                                 printf("%s", reclock);
2812                         }
2813                 } else {
2814                         if (reclock == NULL) {
2815                                 printf("No reclock file used.\n");
2816                         } else {
2817                                 printf("Reclock file:%s\n", reclock);
2818                         }
2819                 }
2820         }
2821         return 0;
2822 }
2823
2824 /*
2825   set the reclock file of a node
2826  */
2827 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
2828 {
2829         int ret;
2830         const char *reclock;
2831
2832         if (argc == 0) {
2833                 reclock = NULL;
2834         } else if (argc == 1) {
2835                 reclock = argv[0];
2836         } else {
2837                 usage();
2838         }
2839
2840         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
2841         if (ret != 0) {
2842                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
2843                 return ret;
2844         }
2845         return 0;
2846 }
2847
2848 /*
2849   set the natgw state on/off
2850  */
2851 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
2852 {
2853         int ret;
2854         uint32_t natgwstate;
2855
2856         if (argc == 0) {
2857                 usage();
2858         }
2859
2860         if (!strcmp(argv[0], "on")) {
2861                 natgwstate = 1;
2862         } else if (!strcmp(argv[0], "off")) {
2863                 natgwstate = 0;
2864         } else {
2865                 usage();
2866         }
2867
2868         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
2869         if (ret != 0) {
2870                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
2871                 return ret;
2872         }
2873
2874         return 0;
2875 }
2876
2877 /*
2878   set the lmaster role on/off
2879  */
2880 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2881 {
2882         int ret;
2883         uint32_t lmasterrole;
2884
2885         if (argc == 0) {
2886                 usage();
2887         }
2888
2889         if (!strcmp(argv[0], "on")) {
2890                 lmasterrole = 1;
2891         } else if (!strcmp(argv[0], "off")) {
2892                 lmasterrole = 0;
2893         } else {
2894                 usage();
2895         }
2896
2897         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
2898         if (ret != 0) {
2899                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
2900                 return ret;
2901         }
2902
2903         return 0;
2904 }
2905
2906 /*
2907   set the recmaster role on/off
2908  */
2909 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
2910 {
2911         int ret;
2912         uint32_t recmasterrole;
2913
2914         if (argc == 0) {
2915                 usage();
2916         }
2917
2918         if (!strcmp(argv[0], "on")) {
2919                 recmasterrole = 1;
2920         } else if (!strcmp(argv[0], "off")) {
2921                 recmasterrole = 0;
2922         } else {
2923                 usage();
2924         }
2925
2926         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
2927         if (ret != 0) {
2928                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
2929                 return ret;
2930         }
2931
2932         return 0;
2933 }
2934
2935 /*
2936   set debug level on a node or all nodes
2937  */
2938 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
2939 {
2940         int i, ret;
2941         int32_t level;
2942
2943         if (argc == 0) {
2944                 printf("You must specify the debug level. Valid levels are:\n");
2945                 for (i=0; debug_levels[i].description != NULL; i++) {
2946                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2947                 }
2948
2949                 return 0;
2950         }
2951
2952         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2953                 level = get_debug_by_desc(argv[0]);
2954         } else {
2955                 level = strtol(argv[0], NULL, 0);
2956         }
2957
2958         for (i=0; debug_levels[i].description != NULL; i++) {
2959                 if (level == debug_levels[i].level) {
2960                         break;
2961                 }
2962         }
2963         if (debug_levels[i].description == NULL) {
2964                 printf("Invalid debug level, must be one of\n");
2965                 for (i=0; debug_levels[i].description != NULL; i++) {
2966                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
2967                 }
2968                 return -1;
2969         }
2970
2971         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
2972         if (ret != 0) {
2973                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
2974         }
2975         return 0;
2976 }
2977
2978
2979 /*
2980   freeze a node
2981  */
2982 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
2983 {
2984         int ret;
2985         uint32_t priority;
2986         
2987         if (argc == 1) {
2988                 priority = strtol(argv[0], NULL, 0);
2989         } else {
2990                 priority = 0;
2991         }
2992         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
2993
2994         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
2995         if (ret != 0) {
2996                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
2997         }               
2998         return 0;
2999 }
3000
3001 /*
3002   thaw a node
3003  */
3004 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3005 {
3006         int ret;
3007         uint32_t priority;
3008         
3009         if (argc == 1) {
3010                 priority = strtol(argv[0], NULL, 0);
3011         } else {
3012                 priority = 0;
3013         }
3014         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3015
3016         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3017         if (ret != 0) {
3018                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3019         }               
3020         return 0;
3021 }
3022
3023
3024 /*
3025   attach to a database
3026  */
3027 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3028 {
3029         const char *db_name;
3030         struct ctdb_db_context *ctdb_db;
3031
3032         if (argc < 1) {
3033                 usage();
3034         }
3035         db_name = argv[0];
3036
3037         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3038         if (ctdb_db == NULL) {
3039                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3040                 return -1;
3041         }
3042
3043         return 0;
3044 }
3045
3046 /*
3047   set db priority
3048  */
3049 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3050 {
3051         struct ctdb_db_priority db_prio;
3052         int ret;
3053
3054         if (argc < 2) {
3055                 usage();
3056         }
3057
3058         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3059         db_prio.priority = strtoul(argv[1], NULL, 0);
3060
3061         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3062         if (ret != 0) {
3063                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3064                 return -1;
3065         }
3066
3067         return 0;
3068 }
3069
3070 /*
3071   get db priority
3072  */
3073 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3074 {
3075         uint32_t db_id, priority;
3076         int ret;
3077
3078         if (argc < 1) {
3079                 usage();
3080         }
3081
3082         db_id = strtoul(argv[0], NULL, 0);
3083
3084         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3085         if (ret != 0) {
3086                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3087                 return -1;
3088         }
3089
3090         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3091
3092         return 0;
3093 }
3094
3095 /*
3096   run an eventscript on a node
3097  */
3098 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3099 {
3100         TDB_DATA data;
3101         int ret;
3102         int32_t res;
3103         char *errmsg;
3104         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3105
3106         if (argc != 1) {
3107                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3108                 return -1;
3109         }
3110
3111         data.dptr = (unsigned char *)discard_const(argv[0]);
3112         data.dsize = strlen((char *)data.dptr) + 1;
3113
3114         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3115
3116         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3117                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3118         if (ret != 0 || res != 0) {
3119                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3120                 talloc_free(tmp_ctx);
3121                 return -1;
3122         }
3123         talloc_free(tmp_ctx);
3124         return 0;
3125 }
3126
3127 #define DB_VERSION 1
3128 #define MAX_DB_NAME 64
3129 struct db_file_header {
3130         unsigned long version;
3131         time_t timestamp;
3132         unsigned long persistent;
3133         unsigned long size;
3134         const char name[MAX_DB_NAME];
3135 };
3136
3137 struct backup_data {
3138         struct ctdb_marshall_buffer *records;
3139         uint32_t len;
3140         uint32_t total;
3141         bool traverse_error;
3142 };
3143
3144 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3145 {
3146         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3147         struct ctdb_rec_data *rec;
3148
3149         /* add the record */
3150         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3151         if (rec == NULL) {
3152                 bd->traverse_error = true;
3153                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3154                 return -1;
3155         }
3156         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3157         if (bd->records == NULL) {
3158                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3159                 bd->traverse_error = true;
3160                 return -1;
3161         }
3162         bd->records->count++;
3163         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3164         bd->len += rec->length;
3165         talloc_free(rec);
3166
3167         bd->total++;
3168         return 0;
3169 }
3170
3171 /*
3172  * backup a database to a file 
3173  */
3174 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3175 {
3176         int i, ret;
3177         struct ctdb_dbid_map *dbmap=NULL;
3178         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3179         struct db_file_header dbhdr;
3180         struct ctdb_db_context *ctdb_db;
3181         struct backup_data *bd;
3182         int fh = -1;
3183         int status = -1;
3184         const char *reason = NULL;
3185
3186         if (argc != 2) {
3187                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3188                 return -1;
3189         }
3190
3191         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3192         if (ret != 0) {
3193                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3194                 return ret;
3195         }
3196
3197         for(i=0;i<dbmap->num;i++){
3198                 const char *name;
3199
3200                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3201                 if(!strcmp(argv[0], name)){
3202                         talloc_free(discard_const(name));
3203                         break;
3204                 }
3205                 talloc_free(discard_const(name));
3206         }
3207         if (i == dbmap->num) {
3208                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3209                 talloc_free(tmp_ctx);
3210                 return -1;
3211         }
3212
3213         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3214                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3215         if (ret != 0) {
3216                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3217                                  argv[0]));
3218                 talloc_free(tmp_ctx);
3219                 return -1;
3220         }
3221         if (reason) {
3222                 uint32_t allow_unhealthy = 0;
3223
3224                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3225                                       "AllowUnhealthyDBRead",
3226                                       &allow_unhealthy);
3227
3228                 if (allow_unhealthy != 1) {
3229                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3230                                          argv[0], reason));
3231
3232                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3233                                          allow_unhealthy));
3234                         talloc_free(tmp_ctx);
3235                         return -1;
3236                 }
3237
3238                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3239                                      argv[0], argv[0]));
3240                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3241                                      "tunnable AllowUnhealthyDBRead = %u\n",
3242                                      allow_unhealthy));
3243         }
3244
3245         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3246         if (ctdb_db == NULL) {
3247                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3248                 talloc_free(tmp_ctx);
3249                 return -1;
3250         }
3251
3252
3253         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3254         if (ret == -1) {
3255                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3256                 talloc_free(tmp_ctx);
3257                 return -1;
3258         }
3259
3260
3261         bd = talloc_zero(tmp_ctx, struct backup_data);
3262         if (bd == NULL) {
3263                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3264                 talloc_free(tmp_ctx);
3265                 return -1;
3266         }
3267
3268         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3269         if (bd->records == NULL) {
3270                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3271                 talloc_free(tmp_ctx);
3272                 return -1;
3273         }
3274
3275         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3276         bd->records->db_id = ctdb_db->db_id;
3277         /* traverse the database collecting all records */
3278         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3279             bd->traverse_error) {
3280                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3281                 talloc_free(tmp_ctx);
3282                 return -1;              
3283         }
3284
3285         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3286
3287
3288         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3289         if (fh == -1) {
3290                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3291                 talloc_free(tmp_ctx);
3292                 return -1;
3293         }
3294
3295         dbhdr.version = DB_VERSION;
3296         dbhdr.timestamp = time(NULL);
3297         dbhdr.persistent = dbmap->dbs[i].persistent;
3298         dbhdr.size = bd->len;
3299         if (strlen(argv[0]) >= MAX_DB_NAME) {
3300                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3301                 goto done;
3302         }
3303         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3304         ret = write(fh, &dbhdr, sizeof(dbhdr));
3305         if (ret == -1) {
3306                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3307                 goto done;
3308         }
3309         ret = write(fh, bd->records, bd->len);
3310         if (ret == -1) {
3311                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3312                 goto done;
3313         }
3314
3315         status = 0;
3316 done:
3317         if (fh != -1) {
3318                 ret = close(fh);
3319                 if (ret == -1) {
3320                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3321                 }
3322         }
3323         talloc_free(tmp_ctx);
3324         return status;
3325 }
3326
3327 /*
3328  * restore a database from a file 
3329  */
3330 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3331 {
3332         int ret;
3333         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3334         TDB_DATA outdata;
3335         TDB_DATA data;
3336         struct db_file_header dbhdr;
3337         struct ctdb_db_context *ctdb_db;
3338         struct ctdb_node_map *nodemap=NULL;
3339         struct ctdb_vnn_map *vnnmap=NULL;
3340         int i, fh;
3341         struct ctdb_control_wipe_database w;
3342         uint32_t *nodes;
3343         uint32_t generation;
3344         struct tm *tm;
3345         char tbuf[100];
3346
3347         if (argc != 1) {
3348                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3349                 return -1;
3350         }
3351
3352         fh = open(argv[0], O_RDONLY);
3353         if (fh == -1) {
3354                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3355                 talloc_free(tmp_ctx);
3356                 return -1;
3357         }
3358
3359         read(fh, &dbhdr, sizeof(dbhdr));
3360         if (dbhdr.version != DB_VERSION) {
3361                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3362                 talloc_free(tmp_ctx);
3363                 return -1;
3364         }
3365
3366         outdata.dsize = dbhdr.size;
3367         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3368         if (outdata.dptr == NULL) {
3369                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3370                 close(fh);
3371                 talloc_free(tmp_ctx);
3372                 return -1;
3373         }               
3374         read(fh, outdata.dptr, outdata.dsize);
3375         close(fh);
3376
3377         tm = localtime(&dbhdr.timestamp);
3378         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3379         printf("Restoring database '%s' from backup @ %s\n",
3380                 dbhdr.name, tbuf);
3381
3382
3383         ctdb_db = ctdb_attach(ctdb, dbhdr.name, dbhdr.persistent, 0);
3384         if (ctdb_db == NULL) {
3385                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbhdr.name));
3386                 talloc_free(tmp_ctx);
3387                 return -1;
3388         }
3389
3390         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3391         if (ret != 0) {
3392                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3393                 talloc_free(tmp_ctx);
3394                 return ret;
3395         }
3396
3397
3398         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3399         if (ret != 0) {
3400                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3401                 talloc_free(tmp_ctx);
3402                 return ret;
3403         }
3404
3405         /* freeze all nodes */
3406         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3407         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3408                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3409                                         nodes, i,
3410                                         TIMELIMIT(),
3411                                         false, tdb_null,
3412                                         NULL, NULL,
3413                                         NULL) != 0) {
3414                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3415                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3416                         talloc_free(tmp_ctx);
3417                         return -1;
3418                 }
3419         }
3420
3421         generation = vnnmap->generation;
3422         data.dptr = (void *)&generation;
3423         data.dsize = sizeof(generation);
3424
3425         /* start a cluster wide transaction */
3426         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3427         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3428                                         nodes, 0,
3429                                         TIMELIMIT(), false, data,
3430                                         NULL, NULL,
3431                                         NULL) != 0) {
3432                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3433                 return -1;
3434         }
3435
3436
3437         w.db_id = ctdb_db->db_id;
3438         w.transaction_id = generation;
3439
3440         data.dptr = (void *)&w;
3441         data.dsize = sizeof(w);
3442
3443         /* wipe all the remote databases. */
3444         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3445         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3446                                         nodes, 0,
3447                                         TIMELIMIT(), false, data,
3448                                         NULL, NULL,
3449                                         NULL) != 0) {
3450                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3451                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3452                 talloc_free(tmp_ctx);
3453                 return -1;
3454         }
3455         
3456         /* push the database */
3457         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3458         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3459                                         nodes, 0,
3460                                         TIMELIMIT(), false, outdata,
3461                                         NULL, NULL,
3462                                         NULL) != 0) {
3463                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3464                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3465                 talloc_free(tmp_ctx);
3466                 return -1;
3467         }
3468
3469         data.dptr = (void *)&ctdb_db->db_id;
3470         data.dsize = sizeof(ctdb_db->db_id);
3471
3472         /* mark the database as healthy */
3473         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3474         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3475                                         nodes, 0,
3476                                         TIMELIMIT(), false, data,
3477                                         NULL, NULL,
3478                                         NULL) != 0) {
3479                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3480                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3481                 talloc_free(tmp_ctx);
3482                 return -1;
3483         }
3484
3485         data.dptr = (void *)&generation;
3486         data.dsize = sizeof(generation);
3487
3488         /* commit all the changes */
3489         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3490                                         nodes, 0,
3491                                         TIMELIMIT(), false, data,
3492                                         NULL, NULL,
3493                                         NULL) != 0) {
3494                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3495                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3496                 talloc_free(tmp_ctx);
3497                 return -1;
3498         }
3499
3500
3501         /* thaw all nodes */
3502         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3503         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3504                                         nodes, 0,
3505                                         TIMELIMIT(),
3506                                         false, tdb_null,
3507                                         NULL, NULL,
3508                                         NULL) != 0) {
3509                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3510                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3511                 talloc_free(tmp_ctx);
3512                 return -1;
3513         }
3514
3515
3516         talloc_free(tmp_ctx);
3517         return 0;
3518 }
3519
3520 /*
3521  * dump a database backup from a file
3522  */
3523 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3524 {
3525         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3526         TDB_DATA outdata;
3527         struct db_file_header dbhdr;
3528         int i, fh;
3529         struct tm *tm;
3530         char tbuf[100];
3531         struct ctdb_rec_data *rec = NULL;
3532         struct ctdb_marshall_buffer *m;
3533
3534         if (argc != 1) {
3535                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3536                 return -1;
3537         }
3538
3539         fh = open(argv[0], O_RDONLY);
3540         if (fh == -1) {
3541                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3542                 talloc_free(tmp_ctx);
3543                 return -1;
3544         }
3545
3546         read(fh, &dbhdr, sizeof(dbhdr));
3547         if (dbhdr.version != DB_VERSION) {
3548                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3549                 talloc_free(tmp_ctx);
3550                 return -1;
3551         }
3552
3553         outdata.dsize = dbhdr.size;
3554         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3555         if (outdata.dptr == NULL) {
3556                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3557                 close(fh);
3558                 talloc_free(tmp_ctx);
3559                 return -1;
3560         }
3561         read(fh, outdata.dptr, outdata.dsize);
3562         close(fh);
3563         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3564
3565         tm = localtime(&dbhdr.timestamp);
3566         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3567         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3568                 dbhdr.name, m->db_id, tbuf);
3569
3570         for (i=0; i < m->count; i++) {
3571                 uint32_t reqid = 0;
3572                 TDB_DATA key, data;
3573
3574                 /* we do not want the header splitted, so we pass NULL*/
3575                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3576                                               NULL, &key, &data);
3577
3578                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3579         }
3580
3581         printf("Dumped %d records\n", i);
3582         talloc_free(tmp_ctx);
3583         return 0;
3584 }
3585
3586 /*
3587  * wipe a database from a file
3588  */
3589 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3590                           const char **argv)
3591 {
3592         int ret;
3593         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3594         TDB_DATA data;
3595         struct ctdb_db_context *ctdb_db;
3596         struct ctdb_node_map *nodemap = NULL;
3597         struct ctdb_vnn_map *vnnmap = NULL;
3598         int i;
3599         struct ctdb_control_wipe_database w;
3600         uint32_t *nodes;
3601         uint32_t generation;
3602         struct ctdb_dbid_map *dbmap = NULL;
3603
3604         if (argc != 1) {
3605                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3606                 return -1;
3607         }
3608
3609         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3610                                  &dbmap);
3611         if (ret != 0) {
3612                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3613                                   options.pnn));
3614                 return ret;
3615         }
3616
3617         for(i=0;i<dbmap->num;i++){
3618                 const char *name;
3619
3620                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3621                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3622                 if(!strcmp(argv[0], name)){
3623                         talloc_free(discard_const(name));
3624                         break;
3625                 }
3626                 talloc_free(discard_const(name));
3627         }
3628         if (i == dbmap->num) {
3629                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3630                                   argv[0]));
3631                 talloc_free(tmp_ctx);
3632                 return -1;
3633         }
3634
3635         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3636         if (ctdb_db == NULL) {
3637                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3638                                   argv[0]));
3639                 talloc_free(tmp_ctx);
3640                 return -1;
3641         }
3642
3643         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3644                                    &nodemap);
3645         if (ret != 0) {
3646                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3647                                   options.pnn));
3648                 talloc_free(tmp_ctx);
3649                 return ret;
3650         }
3651
3652         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3653                                   &vnnmap);
3654         if (ret != 0) {
3655                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3656                                   options.pnn));
3657                 talloc_free(tmp_ctx);
3658                 return ret;
3659         }
3660
3661         /* freeze all nodes */
3662         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3663         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3664                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3665                                                 nodes, i,
3666                                                 TIMELIMIT(),
3667                                                 false, tdb_null,
3668                                                 NULL, NULL,
3669                                                 NULL);
3670                 if (ret != 0) {
3671                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3672                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3673                                              CTDB_RECOVERY_ACTIVE);
3674                         talloc_free(tmp_ctx);
3675                         return -1;
3676                 }
3677         }
3678
3679         generation = vnnmap->generation;
3680         data.dptr = (void *)&generation;
3681         data.dsize = sizeof(generation);
3682
3683         /* start a cluster wide transaction */
3684         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3685         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3686                                         nodes, 0,
3687                                         TIMELIMIT(), false, data,
3688                                         NULL, NULL,
3689                                         NULL);
3690         if (ret!= 0) {
3691                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3692                                   "transactions.\n"));
3693                 return -1;
3694         }
3695
3696         w.db_id = ctdb_db->db_id;
3697         w.transaction_id = generation;
3698
3699         data.dptr = (void *)&w;
3700         data.dsize = sizeof(w);
3701
3702         /* wipe all the remote databases. */
3703         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3704         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3705                                         nodes, 0,
3706                                         TIMELIMIT(), false, data,
3707                                         NULL, NULL,
3708                                         NULL) != 0) {
3709                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3710                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3711                 talloc_free(tmp_ctx);
3712                 return -1;
3713         }
3714
3715         data.dptr = (void *)&ctdb_db->db_id;
3716         data.dsize = sizeof(ctdb_db->db_id);
3717
3718         /* mark the database as healthy */
3719         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3720         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3721                                         nodes, 0,
3722                                         TIMELIMIT(), false, data,
3723                                         NULL, NULL,
3724                                         NULL) != 0) {
3725                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3726                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3727                 talloc_free(tmp_ctx);
3728                 return -1;
3729         }
3730
3731         data.dptr = (void *)&generation;
3732         data.dsize = sizeof(generation);
3733
3734         /* commit all the changes */
3735         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3736                                         nodes, 0,
3737                                         TIMELIMIT(), false, data,
3738                                         NULL, NULL,
3739                                         NULL) != 0) {
3740                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3741                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3742                 talloc_free(tmp_ctx);
3743                 return -1;
3744         }
3745
3746         /* thaw all nodes */
3747         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3748         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3749                                         nodes, 0,
3750                                         TIMELIMIT(),
3751                                         false, tdb_null,
3752                                         NULL, NULL,
3753                                         NULL) != 0) {
3754                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3755                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3756                 talloc_free(tmp_ctx);
3757                 return -1;
3758         }
3759
3760         talloc_free(tmp_ctx);
3761         return 0;
3762 }
3763
3764 /*
3765  * set flags of a node in the nodemap
3766  */
3767 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
3768 {
3769         int ret;
3770         int32_t status;
3771         int node;
3772         int flags;
3773         TDB_DATA data;
3774         struct ctdb_node_flag_change c;
3775
3776         if (argc != 2) {
3777                 usage();
3778                 return -1;
3779         }
3780
3781         if (sscanf(argv[0], "%d", &node) != 1) {
3782                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
3783                 usage();
3784                 return -1;
3785         }
3786         if (sscanf(argv[1], "0x%x", &flags) != 1) {
3787                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
3788                 usage();
3789                 return -1;
3790         }
3791
3792         c.pnn       = node;
3793         c.old_flags = 0;
3794         c.new_flags = flags;
3795
3796         data.dsize = sizeof(c);
3797         data.dptr = (unsigned char *)&c;
3798
3799         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
3800                            data, NULL, NULL, &status, NULL, NULL);
3801         if (ret != 0 || status != 0) {
3802                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
3803                 return -1;
3804         }
3805         return 0;
3806 }
3807
3808 /*
3809   dump memory usage
3810  */
3811 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3812 {
3813         TDB_DATA data;
3814         int ret;
3815         int32_t res;
3816         char *errmsg;
3817         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3818         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
3819                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
3820         if (ret != 0 || res != 0) {
3821                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
3822                 talloc_free(tmp_ctx);
3823                 return -1;
3824         }
3825         write(1, data.dptr, data.dsize);
3826         talloc_free(tmp_ctx);
3827         return 0;
3828 }
3829
3830 /*
3831   handler for memory dumps
3832 */
3833 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3834                              TDB_DATA data, void *private_data)
3835 {
3836         write(1, data.dptr, data.dsize);
3837         exit(0);
3838 }
3839
3840 /*
3841   dump memory usage on the recovery daemon
3842  */
3843 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
3844 {
3845         int ret;
3846         TDB_DATA data;
3847         struct rd_memdump_reply rd;
3848
3849         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3850         if (rd.pnn == -1) {
3851                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
3852                 return -1;
3853         }
3854         rd.srvid = getpid();
3855
3856         /* register a message port for receiveing the reply so that we
3857            can receive the reply
3858         */
3859         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
3860
3861
3862         data.dptr = (uint8_t *)&rd;
3863         data.dsize = sizeof(rd);
3864
3865         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
3866         if (ret != 0) {
3867                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
3868                 return -1;
3869         }
3870
3871         /* this loop will terminate when we have received the reply */
3872         while (1) {     
3873                 event_loop_once(ctdb->ev);
3874         }
3875
3876         return 0;
3877 }
3878
3879 /*
3880   list all nodes in the cluster
3881   if the daemon is running, we read the data from the daemon.
3882   if the daemon is not running we parse the nodes file directly
3883  */
3884 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
3885 {
3886         int i, ret;
3887         struct ctdb_node_map *nodemap=NULL;
3888
3889         if (ctdb != NULL) {
3890                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3891                 if (ret != 0) {
3892                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3893                         return ret;
3894                 }
3895
3896                 for(i=0;i<nodemap->num;i++){
3897                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
3898                                 continue;
3899                         }
3900                         if (options.machinereadable){
3901                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
3902                         } else {
3903                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
3904                         }
3905                 }
3906         } else {
3907                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
3908                 struct pnn_node *pnn_nodes;
3909                 struct pnn_node *pnn_node;
3910         
3911                 pnn_nodes = read_nodes_file(mem_ctx);
3912                 if (pnn_nodes == NULL) {
3913                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
3914                         talloc_free(mem_ctx);
3915                         return -1;
3916                 }
3917
3918                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
3919                         ctdb_sock_addr addr;
3920
3921                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
3922                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
3923                                 talloc_free(mem_ctx);
3924                                 return -1;
3925                         }
3926
3927                         if (options.machinereadable){
3928                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
3929                         } else {
3930                                 printf("%s\n", pnn_node->addr);
3931                         }
3932                 }
3933                 talloc_free(mem_ctx);
3934         }
3935
3936         return 0;
3937 }
3938
3939 /*
3940   reload the nodes file on the local node
3941  */
3942 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
3943 {
3944         int i, ret;
3945         int mypnn;
3946         struct ctdb_node_map *nodemap=NULL;
3947
3948         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3949         if (mypnn == -1) {
3950                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
3951                 return -1;
3952         }
3953
3954         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3955         if (ret != 0) {
3956                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3957                 return ret;
3958         }
3959
3960         /* reload the nodes file on all remote nodes */
3961         for (i=0;i<nodemap->num;i++) {
3962                 if (nodemap->nodes[i].pnn == mypnn) {
3963                         continue;
3964                 }
3965                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
3966                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
3967                         nodemap->nodes[i].pnn);
3968                 if (ret != 0) {
3969                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
3970                 }
3971         }
3972
3973         /* reload the nodes file on the local node */
3974         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
3975         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
3976         if (ret != 0) {
3977                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
3978         }
3979
3980         /* initiate a recovery */
3981         control_recover(ctdb, argc, argv);
3982
3983         return 0;
3984 }
3985
3986
3987 static const struct {
3988         const char *name;
3989         int (*fn)(struct ctdb_context *, int, const char **);
3990         bool auto_all;
3991         bool without_daemon; /* can be run without daemon running ? */
3992         const char *msg;
3993         const char *args;
3994 } ctdb_commands[] = {
3995 #ifdef CTDB_VERS
3996         { "version",         control_version,           true,   false,  "show version of ctdb" },
3997 #endif
3998         { "status",          control_status,            true,   false,  "show node status" },
3999         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4000         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4001         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4002         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4003         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4004         { "statistics",      control_statistics,        false,  false, "show statistics" },
4005         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4006         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4007         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4008         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4009         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4010         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4011         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4012         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4013         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4014         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4015         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4016         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4017         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4018         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4019         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4020         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4021         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4022         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4023         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4024         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4025         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4026         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4027         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4028         { "stop",            control_stop,              true,   false,  "stop a node" },
4029         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4030         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4031         { "unban",           control_unban,             true,   false,  "unban a node" },
4032         { "showban",         control_showban,           true,   false,  "show ban information"},
4033         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4034         { "recover",         control_recover,           true,   false,  "force recovery" },
4035         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4036         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4037         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4038         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4039         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4040         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4041         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4042         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4043
4044         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4045         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4046         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4047         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4048         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4049         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4050         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4051         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4052         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4053         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4054         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4055         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4056         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4057         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file>"},
4058         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4059         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4060         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4061         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4062         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4063         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4064         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4065         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4066         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4067         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4068         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4069         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4070         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4071         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4072         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4073         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4074 };
4075
4076 /*
4077   show usage message
4078  */
4079 static void usage(void)
4080 {
4081         int i;
4082         printf(
4083 "Usage: ctdb [options] <control>\n" \
4084 "Options:\n" \
4085 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4086 "   -Y                 generate machinereadable output\n"
4087 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4088         printf("Controls:\n");
4089         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4090                 printf("  %-15s %-27s  %s\n", 
4091                        ctdb_commands[i].name, 
4092                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4093                        ctdb_commands[i].msg);
4094         }
4095         exit(1);
4096 }
4097
4098
4099 static void ctdb_alarm(int sig)
4100 {
4101         printf("Maximum runtime exceeded - exiting\n");
4102         _exit(ERR_TIMEOUT);
4103 }
4104
4105 /*
4106   main program
4107 */
4108 int main(int argc, const char *argv[])
4109 {
4110         struct ctdb_context *ctdb;
4111         char *nodestring = NULL;
4112         struct poptOption popt_options[] = {
4113                 POPT_AUTOHELP
4114                 POPT_CTDB_CMDLINE
4115                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4116                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4117                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4118                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4119                 POPT_TABLEEND
4120         };
4121         int opt;
4122         const char **extra_argv;
4123         int extra_argc = 0;
4124         int ret=-1, i;
4125         poptContext pc;
4126         struct event_context *ev;
4127         const char *control;
4128
4129         setlinebuf(stdout);
4130         
4131         /* set some defaults */
4132         options.maxruntime = 0;
4133         options.timelimit = 3;
4134         options.pnn = CTDB_CURRENT_NODE;
4135
4136         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4137
4138         while ((opt = poptGetNextOpt(pc)) != -1) {
4139                 switch (opt) {
4140                 default:
4141                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4142                                 poptBadOption(pc, 0), poptStrerror(opt)));
4143                         exit(1);
4144                 }
4145         }
4146
4147         /* setup the remaining options for the main program to use */
4148         extra_argv = poptGetArgs(pc);
4149         if (extra_argv) {
4150                 extra_argv++;
4151                 while (extra_argv[extra_argc]) extra_argc++;
4152         }
4153
4154         if (extra_argc < 1) {
4155                 usage();
4156         }
4157
4158         if (options.maxruntime == 0) {
4159                 const char *ctdb_timeout;
4160                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4161                 if (ctdb_timeout != NULL) {
4162                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4163                 } else {
4164                         /* default timeout is 120 seconds */
4165                         options.maxruntime = 120;
4166                 }
4167         }
4168
4169         signal(SIGALRM, ctdb_alarm);
4170         alarm(options.maxruntime);
4171
4172         /* setup the node number to contact */
4173         if (nodestring != NULL) {
4174                 if (strcmp(nodestring, "all") == 0) {
4175                         options.pnn = CTDB_BROADCAST_ALL;
4176                 } else {
4177                         options.pnn = strtoul(nodestring, NULL, 0);
4178                 }
4179         }
4180
4181         control = extra_argv[0];
4182
4183         ev = event_context_init(NULL);
4184
4185         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4186                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4187                         int j;
4188
4189                         if (ctdb_commands[i].without_daemon == true) {
4190                                 close(2);
4191                         }
4192
4193                         /* initialise ctdb */
4194                         ctdb = ctdb_cmdline_client(ev);
4195
4196                         if (ctdb_commands[i].without_daemon == false) {
4197                                 if (ctdb == NULL) {
4198                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4199                                         exit(1);
4200                                 }
4201
4202                                 /* verify the node exists */
4203                                 verify_node(ctdb);
4204
4205                                 if (options.pnn == CTDB_CURRENT_NODE) {
4206                                         int pnn;
4207                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4208                                         if (pnn == -1) {
4209                                                 return -1;
4210                                         }
4211                                         options.pnn = pnn;
4212                                 }
4213                         }
4214
4215                         if (ctdb_commands[i].auto_all && 
4216                             options.pnn == CTDB_BROADCAST_ALL) {
4217                                 uint32_t *nodes;
4218                                 uint32_t num_nodes;
4219                                 ret = 0;
4220
4221                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4222                                 CTDB_NO_MEMORY(ctdb, nodes);
4223         
4224                                 for (j=0;j<num_nodes;j++) {
4225                                         options.pnn = nodes[j];
4226                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4227                                 }
4228                                 talloc_free(nodes);
4229                         } else {
4230                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4231                         }
4232                         break;
4233                 }
4234         }
4235
4236         if (i == ARRAY_SIZE(ctdb_commands)) {
4237                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4238                 exit(1);
4239         }
4240
4241         return ret;
4242 }