When we say "current time of statistics" in the "ctdb statistics" output,
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         int machinereadable;
47         int maxruntime;
48 } options;
49
50 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
51 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
52
53 #ifdef CTDB_VERS
54 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
55 {
56 #define STR(x) #x
57 #define XSTR(x) STR(x)
58         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
59         return 0;
60 }
61 #endif
62
63
64 /*
65   verify that a node exists and is reachable
66  */
67 static void verify_node(struct ctdb_context *ctdb)
68 {
69         int ret;
70         struct ctdb_node_map *nodemap=NULL;
71
72         if (options.pnn == CTDB_CURRENT_NODE) {
73                 return;
74         }
75         if (options.pnn == CTDB_BROADCAST_ALL) {
76                 return;
77         }
78
79         /* verify the node exists */
80         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
81                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
82                 exit(10);
83         }
84         if (options.pnn >= nodemap->num) {
85                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
86                 exit(ERR_NONODE);
87         }
88         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
89                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
90                 exit(ERR_DISNODE);
91         }
92         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
93                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
94                 exit(ERR_DISNODE);
95         }
96
97         /* verify we can access the node */
98         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
99         if (ret == -1) {
100                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
101                 exit(10);
102         }
103 }
104
105 /*
106  check if a database exists
107 */
108 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
109 {
110         int i, ret;
111         struct ctdb_dbid_map *dbmap=NULL;
112
113         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
114         if (ret != 0) {
115                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
116                 return -1;
117         }
118
119         for(i=0;i<dbmap->num;i++){
120                 const char *name;
121
122                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
123                 if (!strcmp(name, db_name)) {
124                         return 0;
125                 }
126         }
127
128         return -1;
129 }
130
131 /*
132   see if a process exists
133  */
134 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
135 {
136         uint32_t pnn, pid;
137         int ret;
138         if (argc < 1) {
139                 usage();
140         }
141
142         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
143                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
144                 return -1;
145         }
146
147         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
148         if (ret == 0) {
149                 printf("%u:%u exists\n", pnn, pid);
150         } else {
151                 printf("%u:%u does not exist\n", pnn, pid);
152         }
153         return ret;
154 }
155
156 /*
157   display statistics structure
158  */
159 static void show_statistics(struct ctdb_statistics *s)
160 {
161         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
162         int i;
163         const char *prefix=NULL;
164         int preflen=0;
165         int tmp, days, hours, minutes, seconds;
166         const struct {
167                 const char *name;
168                 uint32_t offset;
169         } fields[] = {
170 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
171                 STATISTICS_FIELD(num_clients),
172                 STATISTICS_FIELD(frozen),
173                 STATISTICS_FIELD(recovering),
174                 STATISTICS_FIELD(num_recoveries),
175                 STATISTICS_FIELD(client_packets_sent),
176                 STATISTICS_FIELD(client_packets_recv),
177                 STATISTICS_FIELD(node_packets_sent),
178                 STATISTICS_FIELD(node_packets_recv),
179                 STATISTICS_FIELD(keepalive_packets_sent),
180                 STATISTICS_FIELD(keepalive_packets_recv),
181                 STATISTICS_FIELD(node.req_call),
182                 STATISTICS_FIELD(node.reply_call),
183                 STATISTICS_FIELD(node.req_dmaster),
184                 STATISTICS_FIELD(node.reply_dmaster),
185                 STATISTICS_FIELD(node.reply_error),
186                 STATISTICS_FIELD(node.req_message),
187                 STATISTICS_FIELD(node.req_control),
188                 STATISTICS_FIELD(node.reply_control),
189                 STATISTICS_FIELD(client.req_call),
190                 STATISTICS_FIELD(client.req_message),
191                 STATISTICS_FIELD(client.req_control),
192                 STATISTICS_FIELD(timeouts.call),
193                 STATISTICS_FIELD(timeouts.control),
194                 STATISTICS_FIELD(timeouts.traverse),
195                 STATISTICS_FIELD(total_calls),
196                 STATISTICS_FIELD(pending_calls),
197                 STATISTICS_FIELD(lockwait_calls),
198                 STATISTICS_FIELD(pending_lockwait_calls),
199                 STATISTICS_FIELD(childwrite_calls),
200                 STATISTICS_FIELD(pending_childwrite_calls),
201                 STATISTICS_FIELD(memory_used),
202                 STATISTICS_FIELD(max_hop_count),
203         };
204         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
205         seconds = tmp%60;
206         tmp    /= 60;
207         minutes = tmp%60;
208         tmp    /= 60;
209         hours   = tmp%24;
210         tmp    /= 24;
211         days    = tmp;
212
213         printf("CTDB version %u\n", CTDB_VERSION);
214         printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
215         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
216
217         for (i=0;i<ARRAY_SIZE(fields);i++) {
218                 if (strchr(fields[i].name, '.')) {
219                         preflen = strcspn(fields[i].name, ".")+1;
220                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
221                                 prefix = fields[i].name;
222                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
223                         }
224                 } else {
225                         preflen = 0;
226                 }
227                 printf(" %*s%-22s%*s%10u\n", 
228                        preflen?4:0, "",
229                        fields[i].name+preflen, 
230                        preflen?0:4, "",
231                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
232         }
233         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
234         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
235
236         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
237         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
238         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
239         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
240
241         talloc_free(tmp_ctx);
242 }
243
244 /*
245   display remote ctdb statistics combined from all nodes
246  */
247 static int control_statistics_all(struct ctdb_context *ctdb)
248 {
249         int ret, i;
250         struct ctdb_statistics statistics;
251         uint32_t *nodes;
252         uint32_t num_nodes;
253
254         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
255         CTDB_NO_MEMORY(ctdb, nodes);
256         
257         ZERO_STRUCT(statistics);
258
259         for (i=0;i<num_nodes;i++) {
260                 struct ctdb_statistics s1;
261                 int j;
262                 uint32_t *v1 = (uint32_t *)&s1;
263                 uint32_t *v2 = (uint32_t *)&statistics;
264                 uint32_t num_ints = 
265                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
266                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
267                 if (ret != 0) {
268                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
269                         return ret;
270                 }
271                 for (j=0;j<num_ints;j++) {
272                         v2[j] += v1[j];
273                 }
274                 statistics.max_hop_count = 
275                         MAX(statistics.max_hop_count, s1.max_hop_count);
276                 statistics.max_call_latency = 
277                         MAX(statistics.max_call_latency, s1.max_call_latency);
278                 statistics.max_lockwait_latency = 
279                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
280         }
281         talloc_free(nodes);
282         printf("Gathered statistics for %u nodes\n", num_nodes);
283         show_statistics(&statistics);
284         return 0;
285 }
286
287 /*
288   display remote ctdb statistics
289  */
290 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
291 {
292         int ret;
293         struct ctdb_statistics statistics;
294
295         if (options.pnn == CTDB_BROADCAST_ALL) {
296                 return control_statistics_all(ctdb);
297         }
298
299         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
300         if (ret != 0) {
301                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
302                 return ret;
303         }
304         show_statistics(&statistics);
305         return 0;
306 }
307
308
309 /*
310   reset remote ctdb statistics
311  */
312 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
313 {
314         int ret;
315
316         ret = ctdb_statistics_reset(ctdb, options.pnn);
317         if (ret != 0) {
318                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
319                 return ret;
320         }
321         return 0;
322 }
323
324
325 /*
326   display uptime of remote node
327  */
328 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
329 {
330         int ret;
331         struct ctdb_uptime *uptime = NULL;
332         int tmp, days, hours, minutes, seconds;
333
334         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
335         if (ret != 0) {
336                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
337                 return ret;
338         }
339
340         if (options.machinereadable){
341                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
342                 printf(":%u:%u:%u:%lf\n",
343                         (unsigned int)uptime->current_time.tv_sec,
344                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
345                         (unsigned int)uptime->last_recovery_finished.tv_sec,
346                         timeval_delta(&uptime->last_recovery_finished,
347                                       &uptime->last_recovery_started)
348                 );
349                 return 0;
350         }
351
352         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
353
354         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
355         seconds = tmp%60;
356         tmp    /= 60;
357         minutes = tmp%60;
358         tmp    /= 60;
359         hours   = tmp%24;
360         tmp    /= 24;
361         days    = tmp;
362         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
363
364         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
365         seconds = tmp%60;
366         tmp    /= 60;
367         minutes = tmp%60;
368         tmp    /= 60;
369         hours   = tmp%24;
370         tmp    /= 24;
371         days    = tmp;
372         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
373         
374         printf("Duration of last recovery/failover: %lf seconds\n",
375                 timeval_delta(&uptime->last_recovery_finished,
376                               &uptime->last_recovery_started));
377
378         return 0;
379 }
380
381 /*
382   show the PNN of the current node
383  */
384 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
385 {
386         uint32_t mypnn;
387         int ret;
388
389         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
390         if (ret != 0) {
391                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
392                 return -1;
393         }
394
395         printf("PNN:%d\n", mypnn);
396         return 0;
397 }
398
399
400 struct pnn_node {
401         struct pnn_node *next;
402         const char *addr;
403         int pnn;
404 };
405
406 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
407 {
408         const char *nodes_list;
409         int nlines;
410         char **lines;
411         int i, pnn;
412         struct pnn_node *pnn_nodes = NULL;
413         struct pnn_node *pnn_node;
414         struct pnn_node *tmp_node;
415
416         /* read the nodes file */
417         nodes_list = getenv("CTDB_NODES");
418         if (nodes_list == NULL) {
419                 nodes_list = "/etc/ctdb/nodes";
420         }
421         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
422         if (lines == NULL) {
423                 return NULL;
424         }
425         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
426                 nlines--;
427         }
428         for (i=0, pnn=0; i<nlines; i++) {
429                 char *node;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436                 if (*node == '#') {
437                         pnn++;
438                         continue;
439                 }
440                 if (strcmp(node, "") == 0) {
441                         continue;
442                 }
443                 pnn_node = talloc(mem_ctx, struct pnn_node);
444                 pnn_node->pnn = pnn++;
445                 pnn_node->addr = talloc_strdup(pnn_node, node);
446                 pnn_node->next = pnn_nodes;
447                 pnn_nodes = pnn_node;
448         }
449
450         /* swap them around so we return them in incrementing order */
451         pnn_node = pnn_nodes;
452         pnn_nodes = NULL;
453         while (pnn_node) {
454                 tmp_node = pnn_node;
455                 pnn_node = pnn_node->next;
456
457                 tmp_node->next = pnn_nodes;
458                 pnn_nodes = tmp_node;
459         }
460
461         return pnn_nodes;
462 }
463
464 /*
465   show the PNN of the current node
466   discover the pnn by loading the nodes file and try to bind to all
467   addresses one at a time until the ip address is found.
468  */
469 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
470 {
471         TALLOC_CTX *mem_ctx = talloc_new(NULL);
472         struct pnn_node *pnn_nodes;
473         struct pnn_node *pnn_node;
474
475         pnn_nodes = read_nodes_file(mem_ctx);
476         if (pnn_nodes == NULL) {
477                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
478                 talloc_free(mem_ctx);
479                 return -1;
480         }
481
482         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
483                 ctdb_sock_addr addr;
484
485                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
486                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
487                         talloc_free(mem_ctx);
488                         return -1;
489                 }
490
491                 if (ctdb_sys_have_ip(&addr)) {
492                         printf("PNN:%d\n", pnn_node->pnn);
493                         talloc_free(mem_ctx);
494                         return 0;
495                 }
496         }
497
498         printf("Failed to detect which PNN this node is\n");
499         talloc_free(mem_ctx);
500         return -1;
501 }
502
503 /*
504   display remote ctdb status
505  */
506 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
507 {
508         int i, ret;
509         struct ctdb_vnn_map *vnnmap=NULL;
510         struct ctdb_node_map *nodemap=NULL;
511         uint32_t recmode, recmaster;
512         int mypnn;
513
514         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
515         if (mypnn == -1) {
516                 return -1;
517         }
518
519         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
520         if (ret != 0) {
521                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
522                 return ret;
523         }
524
525         if(options.machinereadable){
526                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
527                 for(i=0;i<nodemap->num;i++){
528                         int partially_online = 0;
529                         int j;
530
531                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
532                                 continue;
533                         }
534                         if (nodemap->nodes[i].flags == 0) {
535                                 struct ctdb_control_get_ifaces *ifaces;
536
537                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
538                                                            nodemap->nodes[i].pnn,
539                                                            ctdb, &ifaces);
540                                 if (ret == 0) {
541                                         for (j=0; j < ifaces->num; j++) {
542                                                 if (ifaces->ifaces[j].link_state != 0) {
543                                                         continue;
544                                                 }
545                                                 partially_online = 1;
546                                                 break;
547                                         }
548                                         talloc_free(ifaces);
549                                 }
550                         }
551                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
552                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
553                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
554                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
555                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
556                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
557                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
558                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
559                                partially_online);
560                 }
561                 return 0;
562         }
563
564         printf("Number of nodes:%d\n", nodemap->num);
565         for(i=0;i<nodemap->num;i++){
566                 static const struct {
567                         uint32_t flag;
568                         const char *name;
569                 } flag_names[] = {
570                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
571                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
572                         { NODE_FLAGS_BANNED,                "BANNED" },
573                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
574                         { NODE_FLAGS_DELETED,               "DELETED" },
575                         { NODE_FLAGS_STOPPED,               "STOPPED" },
576                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
577                 };
578                 char *flags_str = NULL;
579                 int j;
580
581                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
582                         continue;
583                 }
584                 if (nodemap->nodes[i].flags == 0) {
585                         struct ctdb_control_get_ifaces *ifaces;
586
587                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
588                                                    nodemap->nodes[i].pnn,
589                                                    ctdb, &ifaces);
590                         if (ret == 0) {
591                                 for (j=0; j < ifaces->num; j++) {
592                                         if (ifaces->ifaces[j].link_state != 0) {
593                                                 continue;
594                                         }
595                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
596                                         break;
597                                 }
598                                 talloc_free(ifaces);
599                         }
600                 }
601                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
602                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
603                                 if (flags_str == NULL) {
604                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
605                                 } else {
606                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
607                                                                            flag_names[j].name);
608                                 }
609                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
610                         }
611                 }
612                 if (flags_str == NULL) {
613                         flags_str = talloc_strdup(ctdb, "OK");
614                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
615                 }
616                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
617                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
618                        flags_str,
619                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
620                 talloc_free(flags_str);
621         }
622
623         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
624         if (ret != 0) {
625                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
626                 return ret;
627         }
628         if (vnnmap->generation == INVALID_GENERATION) {
629                 printf("Generation:INVALID\n");
630         } else {
631                 printf("Generation:%d\n",vnnmap->generation);
632         }
633         printf("Size:%d\n",vnnmap->size);
634         for(i=0;i<vnnmap->size;i++){
635                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
636         }
637
638         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
639         if (ret != 0) {
640                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
641                 return ret;
642         }
643         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
644
645         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
646         if (ret != 0) {
647                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
648                 return ret;
649         }
650         printf("Recovery master:%d\n",recmaster);
651
652         return 0;
653 }
654
655
656 struct natgw_node {
657         struct natgw_node *next;
658         const char *addr;
659 };
660
661 /*
662   display the list of nodes belonging to this natgw configuration
663  */
664 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
665 {
666         int i, ret;
667         const char *natgw_list;
668         int nlines;
669         char **lines;
670         struct natgw_node *natgw_nodes = NULL;
671         struct natgw_node *natgw_node;
672         struct ctdb_node_map *nodemap=NULL;
673
674
675         /* read the natgw nodes file into a linked list */
676         natgw_list = getenv("NATGW_NODES");
677         if (natgw_list == NULL) {
678                 natgw_list = "/etc/ctdb/natgw_nodes";
679         }
680         lines = file_lines_load(natgw_list, &nlines, ctdb);
681         if (lines == NULL) {
682                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
683                 return -1;
684         }
685         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
686                 nlines--;
687         }
688         for (i=0;i<nlines;i++) {
689                 char *node;
690
691                 node = lines[i];
692                 /* strip leading spaces */
693                 while((*node == ' ') || (*node == '\t')) {
694                         node++;
695                 }
696                 if (*node == '#') {
697                         continue;
698                 }
699                 if (strcmp(node, "") == 0) {
700                         continue;
701                 }
702                 natgw_node = talloc(ctdb, struct natgw_node);
703                 natgw_node->addr = talloc_strdup(natgw_node, node);
704                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
705                 natgw_node->next = natgw_nodes;
706                 natgw_nodes = natgw_node;
707         }
708
709         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
710         if (ret != 0) {
711                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
712                 return ret;
713         }
714
715         i=0;
716         while(i<nodemap->num) {
717                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
718                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
719                                 break;
720                         }
721                 }
722
723                 /* this node was not in the natgw so we just remove it from
724                  * the list
725                  */
726                 if ((natgw_node == NULL) 
727                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
728                         int j;
729
730                         for (j=i+1; j<nodemap->num; j++) {
731                                 nodemap->nodes[j-1] = nodemap->nodes[j];
732                         }
733                         nodemap->num--;
734                         continue;
735                 }
736
737                 i++;
738         }               
739
740         /* pick a node to be natgwmaster
741          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
742          */
743         for(i=0;i<nodemap->num;i++){
744                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
745                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
746                         break;
747                 }
748         }
749         /* we couldnt find any healthy node, try unhealthy ones */
750         if (i == nodemap->num) {
751                 for(i=0;i<nodemap->num;i++){
752                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
753                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
754                                 break;
755                         }
756                 }
757         }
758         /* unless all nodes are STOPPED, when we pick one anyway */
759         if (i == nodemap->num) {
760                 for(i=0;i<nodemap->num;i++){
761                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
762                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
763                                 break;
764                         }
765                 }
766                 /* or if we still can not find any */
767                 if (i == nodemap->num) {
768                         printf("-1 0.0.0.0\n");
769                 }
770         }
771
772         /* print the pruned list of nodes belonging to this natgw list */
773         for(i=0;i<nodemap->num;i++){
774                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
775                         continue;
776                 }
777                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
778                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
779                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
780                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
781                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
782                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
783                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
784         }
785
786         return 0;
787 }
788
789 /*
790   display the status of the scripts for monitoring (or other events)
791  */
792 static int control_one_scriptstatus(struct ctdb_context *ctdb,
793                                     enum ctdb_eventscript_call type)
794 {
795         struct ctdb_scripts_wire *script_status;
796         int ret, i;
797
798         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
799         if (ret != 0) {
800                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
801                 return ret;
802         }
803
804         if (script_status == NULL) {
805                 if (!options.machinereadable) {
806                         printf("%s cycle never run\n",
807                                ctdb_eventscript_call_names[type]);
808                 }
809                 return 0;
810         }
811
812         if (!options.machinereadable) {
813                 printf("%d scripts were executed last %s cycle\n",
814                        script_status->num_scripts,
815                        ctdb_eventscript_call_names[type]);
816         }
817         for (i=0; i<script_status->num_scripts; i++) {
818                 const char *status = NULL;
819
820                 switch (script_status->scripts[i].status) {
821                 case -ETIME:
822                         status = "TIMEDOUT";
823                         break;
824                 case -ENOEXEC:
825                         status = "DISABLED";
826                         break;
827                 case 0:
828                         status = "OK";
829                         break;
830                 default:
831                         if (script_status->scripts[i].status > 0)
832                                 status = "ERROR";
833                         break;
834                 }
835                 if (options.machinereadable) {
836                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
837                                ctdb_eventscript_call_names[type],
838                                script_status->scripts[i].name,
839                                script_status->scripts[i].status,
840                                status,
841                                (long)script_status->scripts[i].start.tv_sec,
842                                (long)script_status->scripts[i].start.tv_usec,
843                                (long)script_status->scripts[i].finished.tv_sec,
844                                (long)script_status->scripts[i].finished.tv_usec,
845                                script_status->scripts[i].output);
846                         continue;
847                 }
848                 if (status)
849                         printf("%-20s Status:%s    ",
850                                script_status->scripts[i].name, status);
851                 else
852                         /* Some other error, eg from stat. */
853                         printf("%-20s Status:CANNOT RUN (%s)",
854                                script_status->scripts[i].name,
855                                strerror(-script_status->scripts[i].status));
856
857                 if (script_status->scripts[i].status >= 0) {
858                         printf("Duration:%.3lf ",
859                         timeval_delta(&script_status->scripts[i].finished,
860                               &script_status->scripts[i].start));
861                 }
862                 if (script_status->scripts[i].status != -ENOEXEC) {
863                         printf("%s",
864                                ctime(&script_status->scripts[i].start.tv_sec));
865                         if (script_status->scripts[i].status != 0) {
866                                 printf("   OUTPUT:%s\n",
867                                        script_status->scripts[i].output);
868                         }
869                 } else {
870                         printf("\n");
871                 }
872         }
873         return 0;
874 }
875
876
877 static int control_scriptstatus(struct ctdb_context *ctdb,
878                                 int argc, const char **argv)
879 {
880         int ret;
881         enum ctdb_eventscript_call type, min, max;
882         const char *arg;
883
884         if (argc > 1) {
885                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
886                 return -1;
887         }
888
889         if (argc == 0)
890                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
891         else
892                 arg = argv[0];
893
894         for (type = 0; type < CTDB_EVENT_MAX; type++) {
895                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
896                         min = type;
897                         max = type+1;
898                         break;
899                 }
900         }
901         if (type == CTDB_EVENT_MAX) {
902                 if (strcmp(arg, "all") == 0) {
903                         min = 0;
904                         max = CTDB_EVENT_MAX;
905                 } else {
906                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
907                         return -1;
908                 }
909         }
910
911         if (options.machinereadable) {
912                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
913         }
914
915         for (type = min; type < max; type++) {
916                 ret = control_one_scriptstatus(ctdb, type);
917                 if (ret != 0) {
918                         return ret;
919                 }
920         }
921
922         return 0;
923 }
924
925 /*
926   enable an eventscript
927  */
928 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
929 {
930         int ret;
931
932         if (argc < 1) {
933                 usage();
934         }
935
936         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
937         if (ret != 0) {
938           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
939                 return ret;
940         }
941
942         return 0;
943 }
944
945 /*
946   disable an eventscript
947  */
948 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
949 {
950         int ret;
951
952         if (argc < 1) {
953                 usage();
954         }
955
956         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
957         if (ret != 0) {
958           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
959                 return ret;
960         }
961
962         return 0;
963 }
964
965 /*
966   display the pnn of the recovery master
967  */
968 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
969 {
970         int ret;
971         uint32_t recmaster;
972
973         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
974         if (ret != 0) {
975                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
976                 return ret;
977         }
978         printf("%d\n",recmaster);
979
980         return 0;
981 }
982
983 /*
984   get a list of all tickles for this pnn
985  */
986 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
987 {
988         struct ctdb_control_tcp_tickle_list *list;
989         ctdb_sock_addr addr;
990         int i, ret;
991
992         if (argc < 1) {
993                 usage();
994         }
995
996         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
997                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
998                 return -1;
999         }
1000
1001         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1002         if (ret == -1) {
1003                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1004                 return -1;
1005         }
1006
1007         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1008         printf("Num tickles:%u\n", list->tickles.num);
1009         for (i=0;i<list->tickles.num;i++) {
1010                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1011                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1012         }
1013
1014         talloc_free(list);
1015         
1016         return 0;
1017 }
1018
1019
1020 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1021 {
1022         struct ctdb_all_public_ips *ips;
1023         struct ctdb_public_ip ip;
1024         int i, ret;
1025         uint32_t *nodes;
1026         uint32_t disable_time;
1027         TDB_DATA data;
1028         struct ctdb_node_map *nodemap=NULL;
1029         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1030
1031         disable_time = 30;
1032         data.dptr  = (uint8_t*)&disable_time;
1033         data.dsize = sizeof(disable_time);
1034         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1035         if (ret != 0) {
1036                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1037                 return -1;
1038         }
1039
1040
1041
1042         /* read the public ip list from the node */
1043         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1044         if (ret != 0) {
1045                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1046                 talloc_free(tmp_ctx);
1047                 return -1;
1048         }
1049
1050         for (i=0;i<ips->num;i++) {
1051                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1052                         break;
1053                 }
1054         }
1055         if (i==ips->num) {
1056                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1057                         pnn, ctdb_addr_to_str(addr)));
1058                 talloc_free(tmp_ctx);
1059                 return -1;
1060         }
1061
1062         ip.pnn  = pnn;
1063         ip.addr = *addr;
1064
1065         data.dptr  = (uint8_t *)&ip;
1066         data.dsize = sizeof(ip);
1067
1068         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1071                 talloc_free(tmp_ctx);
1072                 return ret;
1073         }
1074
1075         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1076         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1077                                         nodes, 0,
1078                                         LONGTIMELIMIT(),
1079                                         false, data,
1080                                         NULL, NULL,
1081                                         NULL);
1082         if (ret != 0) {
1083                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1084                 talloc_free(tmp_ctx);
1085                 return -1;
1086         }
1087
1088         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1089         if (ret != 0) {
1090                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1091                 talloc_free(tmp_ctx);
1092                 return -1;
1093         }
1094
1095         /* update the recovery daemon so it now knows to expect the new
1096            node assignment for this ip.
1097         */
1098         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1101                 return -1;
1102         }
1103
1104         talloc_free(tmp_ctx);
1105         return 0;
1106 }
1107
1108 /*
1109   move/failover an ip address to a specific node
1110  */
1111 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1112 {
1113         uint32_t pnn;
1114         ctdb_sock_addr addr;
1115
1116         if (argc < 2) {
1117                 usage();
1118                 return -1;
1119         }
1120
1121         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1122                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1123                 return -1;
1124         }
1125
1126
1127         if (sscanf(argv[1], "%u", &pnn) != 1) {
1128                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1129                 return -1;
1130         }
1131
1132         if (move_ip(ctdb, &addr, pnn) != 0) {
1133                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1134                 return -1;
1135         }
1136
1137         return 0;
1138 }
1139
1140 void getips_store_callback(void *param, void *data)
1141 {
1142         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1143         struct ctdb_all_public_ips *ips = param;
1144         int i;
1145
1146         i = ips->num++;
1147         ips->ips[i].pnn  = node_ip->pnn;
1148         ips->ips[i].addr = node_ip->addr;
1149 }
1150
1151 void getips_count_callback(void *param, void *data)
1152 {
1153         uint32_t *count = param;
1154
1155         (*count)++;
1156 }
1157
1158 #define IP_KEYLEN       4
1159 static uint32_t *ip_key(ctdb_sock_addr *ip)
1160 {
1161         static uint32_t key[IP_KEYLEN];
1162
1163         bzero(key, sizeof(key));
1164
1165         switch (ip->sa.sa_family) {
1166         case AF_INET:
1167                 key[0]  = ip->ip.sin_addr.s_addr;
1168                 break;
1169         case AF_INET6:
1170                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1171                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1172                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1173                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1174                 break;
1175         default:
1176                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1177                 return key;
1178         }
1179
1180         return key;
1181 }
1182
1183 static void *add_ip_callback(void *parm, void *data)
1184 {
1185         return parm;
1186 }
1187
1188 static int
1189 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1190 {
1191         struct ctdb_all_public_ips *tmp_ips;
1192         struct ctdb_node_map *nodemap=NULL;
1193         trbt_tree_t *ip_tree;
1194         int i, j, len, ret;
1195         uint32_t count;
1196
1197         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1198         if (ret != 0) {
1199                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1200                 return ret;
1201         }
1202
1203         ip_tree = trbt_create(tmp_ctx, 0);
1204
1205         for(i=0;i<nodemap->num;i++){
1206                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1207                         continue;
1208                 }
1209                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1210                         continue;
1211                 }
1212
1213                 /* read the public ip list from this node */
1214                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1215                 if (ret != 0) {
1216                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1217                         return -1;
1218                 }
1219         
1220                 for (j=0; j<tmp_ips->num;j++) {
1221                         struct ctdb_public_ip *node_ip;
1222
1223                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1224                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1225                         node_ip->addr = tmp_ips->ips[j].addr;
1226
1227                         trbt_insertarray32_callback(ip_tree,
1228                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1229                                 add_ip_callback,
1230                                 node_ip);
1231                 }
1232                 talloc_free(tmp_ips);
1233         }
1234
1235         /* traverse */
1236         count = 0;
1237         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1238
1239         len = offsetof(struct ctdb_all_public_ips, ips) + 
1240                 count*sizeof(struct ctdb_public_ip);
1241         tmp_ips = talloc_zero_size(tmp_ctx, len);
1242         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1243
1244         *ips = tmp_ips;
1245
1246         return 0;
1247 }
1248
1249
1250 /* 
1251  * scans all other nodes and returns a pnn for another node that can host this 
1252  * ip address or -1
1253  */
1254 static int
1255 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1256 {
1257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1258         struct ctdb_all_public_ips *ips;
1259         struct ctdb_node_map *nodemap=NULL;
1260         int i, j, ret;
1261
1262         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1263         if (ret != 0) {
1264                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1265                 talloc_free(tmp_ctx);
1266                 return ret;
1267         }
1268
1269         for(i=0;i<nodemap->num;i++){
1270                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1271                         continue;
1272                 }
1273                 if (nodemap->nodes[i].pnn == options.pnn) {
1274                         continue;
1275                 }
1276
1277                 /* read the public ip list from this node */
1278                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1279                 if (ret != 0) {
1280                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1281                         return -1;
1282                 }
1283
1284                 for (j=0;j<ips->num;j++) {
1285                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1286                                 talloc_free(tmp_ctx);
1287                                 return nodemap->nodes[i].pnn;
1288                         }
1289                 }
1290                 talloc_free(ips);
1291         }
1292
1293         talloc_free(tmp_ctx);
1294         return -1;
1295 }
1296
1297 /*
1298   add a public ip address to a node
1299  */
1300 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1301 {
1302         int i, ret;
1303         int len;
1304         uint32_t pnn;
1305         unsigned mask;
1306         ctdb_sock_addr addr;
1307         struct ctdb_control_ip_iface *pub;
1308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1309         struct ctdb_all_public_ips *ips;
1310
1311
1312         if (argc != 2) {
1313                 talloc_free(tmp_ctx);
1314                 usage();
1315         }
1316
1317         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1318                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1319                 talloc_free(tmp_ctx);
1320                 return -1;
1321         }
1322
1323         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1324         if (ret != 0) {
1325                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1326                 talloc_free(tmp_ctx);
1327                 return ret;
1328         }
1329
1330
1331         /* check if some other node is already serving this ip, if not,
1332          * we will claim it
1333          */
1334         for (i=0;i<ips->num;i++) {
1335                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1336                         break;
1337                 }
1338         }
1339
1340         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1341         pub = talloc_size(tmp_ctx, len); 
1342         CTDB_NO_MEMORY(ctdb, pub);
1343
1344         pub->addr  = addr;
1345         pub->mask  = mask;
1346         pub->len   = strlen(argv[1])+1;
1347         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1348
1349         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1350         if (ret != 0) {
1351                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1352                 talloc_free(tmp_ctx);
1353                 return ret;
1354         }
1355
1356         if (i == ips->num) {
1357                 /* no one has this ip so we claim it */
1358                 pnn  = options.pnn;
1359         } else {
1360                 pnn  = ips->ips[i].pnn;
1361         }
1362
1363         if (move_ip(ctdb, &addr, pnn) != 0) {
1364                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1365                 return -1;
1366         }
1367
1368         talloc_free(tmp_ctx);
1369         return 0;
1370 }
1371
1372 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1373
1374 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1375 {
1376         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1377         struct ctdb_node_map *nodemap=NULL;
1378         struct ctdb_all_public_ips *ips;
1379         int ret, i, j;
1380
1381         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1382         if (ret != 0) {
1383                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1384                 return ret;
1385         }
1386
1387         /* remove it from the nodes that are not hosting the ip currently */
1388         for(i=0;i<nodemap->num;i++){
1389                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1390                         continue;
1391                 }
1392                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1393                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1394                         continue;
1395                 }
1396
1397                 for (j=0;j<ips->num;j++) {
1398                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1399                                 break;
1400                         }
1401                 }
1402                 if (j==ips->num) {
1403                         continue;
1404                 }
1405
1406                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1407                         continue;
1408                 }
1409
1410                 options.pnn = nodemap->nodes[i].pnn;
1411                 control_delip(ctdb, argc, argv);
1412         }
1413
1414
1415         /* remove it from every node (also the one hosting it) */
1416         for(i=0;i<nodemap->num;i++){
1417                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1418                         continue;
1419                 }
1420                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1421                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1422                         continue;
1423                 }
1424
1425                 for (j=0;j<ips->num;j++) {
1426                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1427                                 break;
1428                         }
1429                 }
1430                 if (j==ips->num) {
1431                         continue;
1432                 }
1433
1434                 options.pnn = nodemap->nodes[i].pnn;
1435                 control_delip(ctdb, argc, argv);
1436         }
1437
1438         talloc_free(tmp_ctx);
1439         return 0;
1440 }
1441         
1442 /*
1443   delete a public ip address from a node
1444  */
1445 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1446 {
1447         int i, ret;
1448         ctdb_sock_addr addr;
1449         struct ctdb_control_ip_iface pub;
1450         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1451         struct ctdb_all_public_ips *ips;
1452
1453         if (argc != 1) {
1454                 talloc_free(tmp_ctx);
1455                 usage();
1456         }
1457
1458         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1459                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1460                 return -1;
1461         }
1462
1463         if (options.pnn == CTDB_BROADCAST_ALL) {
1464                 return control_delip_all(ctdb, argc, argv, &addr);
1465         }
1466
1467         pub.addr  = addr;
1468         pub.mask  = 0;
1469         pub.len   = 0;
1470
1471         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1472         if (ret != 0) {
1473                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1474                 talloc_free(tmp_ctx);
1475                 return ret;
1476         }
1477         
1478         for (i=0;i<ips->num;i++) {
1479                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1480                         break;
1481                 }
1482         }
1483
1484         if (i==ips->num) {
1485                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1486                         ctdb_addr_to_str(&addr)));
1487                 talloc_free(tmp_ctx);
1488                 return -1;
1489         }
1490
1491         if (ips->ips[i].pnn == options.pnn) {
1492                 ret = find_other_host_for_public_ip(ctdb, &addr);
1493                 if (ret != -1) {
1494                         if (move_ip(ctdb, &addr, ret) != 0) {
1495                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1496                                 return -1;
1497                         }
1498                 }
1499         }
1500
1501         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1502         if (ret != 0) {
1503                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1504                 talloc_free(tmp_ctx);
1505                 return ret;
1506         }
1507
1508         talloc_free(tmp_ctx);
1509         return 0;
1510 }
1511
1512 /*
1513   kill a tcp connection
1514  */
1515 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1516 {
1517         int ret;
1518         struct ctdb_control_killtcp killtcp;
1519
1520         if (argc < 2) {
1521                 usage();
1522         }
1523
1524         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1525                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1526                 return -1;
1527         }
1528
1529         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1530                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1531                 return -1;
1532         }
1533
1534         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1535         if (ret != 0) {
1536                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1537                 return ret;
1538         }
1539
1540         return 0;
1541 }
1542
1543
1544 /*
1545   send a gratious arp
1546  */
1547 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1548 {
1549         int ret;
1550         ctdb_sock_addr addr;
1551
1552         if (argc < 2) {
1553                 usage();
1554         }
1555
1556         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1557                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1558                 return -1;
1559         }
1560
1561         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1562         if (ret != 0) {
1563                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1564                 return ret;
1565         }
1566
1567         return 0;
1568 }
1569
1570 /*
1571   register a server id
1572  */
1573 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1574 {
1575         int ret;
1576         struct ctdb_server_id server_id;
1577
1578         if (argc < 3) {
1579                 usage();
1580         }
1581
1582         server_id.pnn       = strtoul(argv[0], NULL, 0);
1583         server_id.type      = strtoul(argv[1], NULL, 0);
1584         server_id.server_id = strtoul(argv[2], NULL, 0);
1585
1586         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1587         if (ret != 0) {
1588                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1589                 return ret;
1590         }
1591         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1592         sleep(999);
1593         return -1;
1594 }
1595
1596 /*
1597   unregister a server id
1598  */
1599 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1600 {
1601         int ret;
1602         struct ctdb_server_id server_id;
1603
1604         if (argc < 3) {
1605                 usage();
1606         }
1607
1608         server_id.pnn       = strtoul(argv[0], NULL, 0);
1609         server_id.type      = strtoul(argv[1], NULL, 0);
1610         server_id.server_id = strtoul(argv[2], NULL, 0);
1611
1612         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1615                 return ret;
1616         }
1617         return -1;
1618 }
1619
1620 /*
1621   check if a server id exists
1622  */
1623 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1624 {
1625         uint32_t status;
1626         int ret;
1627         struct ctdb_server_id server_id;
1628
1629         if (argc < 3) {
1630                 usage();
1631         }
1632
1633         server_id.pnn       = strtoul(argv[0], NULL, 0);
1634         server_id.type      = strtoul(argv[1], NULL, 0);
1635         server_id.server_id = strtoul(argv[2], NULL, 0);
1636
1637         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1638         if (ret != 0) {
1639                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1640                 return ret;
1641         }
1642
1643         if (status) {
1644                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1645         } else {
1646                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1647         }
1648         return 0;
1649 }
1650
1651 /*
1652   get a list of all server ids that are registered on a node
1653  */
1654 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1655 {
1656         int i, ret;
1657         struct ctdb_server_id_list *server_ids;
1658
1659         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1660         if (ret != 0) {
1661                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1662                 return ret;
1663         }
1664
1665         for (i=0; i<server_ids->num; i++) {
1666                 printf("Server id %d:%d:%d\n", 
1667                         server_ids->server_ids[i].pnn, 
1668                         server_ids->server_ids[i].type, 
1669                         server_ids->server_ids[i].server_id); 
1670         }
1671
1672         return -1;
1673 }
1674
1675 /*
1676   send a tcp tickle ack
1677  */
1678 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1679 {
1680         int ret;
1681         ctdb_sock_addr  src, dst;
1682
1683         if (argc < 2) {
1684                 usage();
1685         }
1686
1687         if (!parse_ip_port(argv[0], &src)) {
1688                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1689                 return -1;
1690         }
1691
1692         if (!parse_ip_port(argv[1], &dst)) {
1693                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1694                 return -1;
1695         }
1696
1697         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1698         if (ret==0) {
1699                 return 0;
1700         }
1701         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1702
1703         return -1;
1704 }
1705
1706
1707 /*
1708   display public ip status
1709  */
1710 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1711 {
1712         int i, ret;
1713         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1714         struct ctdb_all_public_ips *ips;
1715
1716         if (options.pnn == CTDB_BROADCAST_ALL) {
1717                 /* read the list of public ips from all nodes */
1718                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1719         } else {
1720                 /* read the public ip list from this node */
1721                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1722         }
1723         if (ret != 0) {
1724                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1725                 talloc_free(tmp_ctx);
1726                 return ret;
1727         }
1728
1729         if (options.machinereadable){
1730                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1731         } else {
1732                 if (options.pnn == CTDB_BROADCAST_ALL) {
1733                         printf("Public IPs on ALL nodes\n");
1734                 } else {
1735                         printf("Public IPs on node %u\n", options.pnn);
1736                 }
1737         }
1738
1739         for (i=1;i<=ips->num;i++) {
1740                 struct ctdb_control_public_ip_info *info = NULL;
1741                 int32_t pnn;
1742                 char *aciface = NULL;
1743                 char *avifaces = NULL;
1744                 char *cifaces = NULL;
1745
1746                 if (options.pnn == CTDB_BROADCAST_ALL) {
1747                         pnn = ips->ips[ips->num-i].pnn;
1748                 } else {
1749                         pnn = options.pnn;
1750                 }
1751
1752                 if (pnn != -1) {
1753                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1754                                                    &ips->ips[ips->num-i].addr, &info);
1755                 } else {
1756                         ret = -1;
1757                 }
1758
1759                 if (ret == 0) {
1760                         int j;
1761                         for (j=0; j < info->num; j++) {
1762                                 if (cifaces == NULL) {
1763                                         cifaces = talloc_strdup(info,
1764                                                                 info->ifaces[j].name);
1765                                 } else {
1766                                         cifaces = talloc_asprintf_append(cifaces,
1767                                                                          ",%s",
1768                                                                          info->ifaces[j].name);
1769                                 }
1770
1771                                 if (info->active_idx == j) {
1772                                         aciface = info->ifaces[j].name;
1773                                 }
1774
1775                                 if (info->ifaces[j].link_state == 0) {
1776                                         continue;
1777                                 }
1778
1779                                 if (avifaces == NULL) {
1780                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1781                                 } else {
1782                                         avifaces = talloc_asprintf_append(avifaces,
1783                                                                           ",%s",
1784                                                                           info->ifaces[j].name);
1785                                 }
1786                         }
1787                 }
1788
1789                 if (options.machinereadable){
1790                         printf(":%s:%d:%s:%s:%s:\n",
1791                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1792                                ips->ips[ips->num-i].pnn,
1793                                aciface?aciface:"",
1794                                avifaces?avifaces:"",
1795                                cifaces?cifaces:"");
1796                 } else {
1797                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1798                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1799                                ips->ips[ips->num-i].pnn,
1800                                aciface?aciface:"",
1801                                avifaces?avifaces:"",
1802                                cifaces?cifaces:"");
1803                 }
1804                 talloc_free(info);
1805         }
1806
1807         talloc_free(tmp_ctx);
1808         return 0;
1809 }
1810
1811 /*
1812   public ip info
1813  */
1814 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1815 {
1816         int i, ret;
1817         ctdb_sock_addr addr;
1818         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1819         struct ctdb_control_public_ip_info *info;
1820
1821         if (argc != 1) {
1822                 talloc_free(tmp_ctx);
1823                 usage();
1824         }
1825
1826         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1827                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1828                 return -1;
1829         }
1830
1831         /* read the public ip info from this node */
1832         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1833                                            tmp_ctx, &addr, &info);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1836                                   argv[0], options.pnn));
1837                 talloc_free(tmp_ctx);
1838                 return ret;
1839         }
1840
1841         printf("Public IP[%s] info on node %u\n",
1842                ctdb_addr_to_str(&info->ip.addr),
1843                options.pnn);
1844
1845         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1846                ctdb_addr_to_str(&info->ip.addr),
1847                info->ip.pnn, info->num);
1848
1849         for (i=0; i<info->num; i++) {
1850                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1851
1852                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1853                        i+1, info->ifaces[i].name,
1854                        info->ifaces[i].link_state?"up":"down",
1855                        (unsigned int)info->ifaces[i].references,
1856                        (i==info->active_idx)?" (active)":"");
1857         }
1858
1859         talloc_free(tmp_ctx);
1860         return 0;
1861 }
1862
1863 /*
1864   display interfaces status
1865  */
1866 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1867 {
1868         int i, ret;
1869         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1870         struct ctdb_control_get_ifaces *ifaces;
1871
1872         /* read the public ip list from this node */
1873         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1874                                    tmp_ctx, &ifaces);
1875         if (ret != 0) {
1876                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1877                                   options.pnn));
1878                 talloc_free(tmp_ctx);
1879                 return ret;
1880         }
1881
1882         if (options.machinereadable){
1883                 printf(":Name:LinkStatus:References:\n");
1884         } else {
1885                 printf("Interfaces on node %u\n", options.pnn);
1886         }
1887
1888         for (i=0; i<ifaces->num; i++) {
1889                 if (options.machinereadable){
1890                         printf(":%s:%s:%u\n",
1891                                ifaces->ifaces[i].name,
1892                                ifaces->ifaces[i].link_state?"1":"0",
1893                                (unsigned int)ifaces->ifaces[i].references);
1894                 } else {
1895                         printf("name:%s link:%s references:%u\n",
1896                                ifaces->ifaces[i].name,
1897                                ifaces->ifaces[i].link_state?"up":"down",
1898                                (unsigned int)ifaces->ifaces[i].references);
1899                 }
1900         }
1901
1902         talloc_free(tmp_ctx);
1903         return 0;
1904 }
1905
1906
1907 /*
1908   set link status of an interface
1909  */
1910 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1911 {
1912         int ret;
1913         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1914         struct ctdb_control_iface_info info;
1915
1916         ZERO_STRUCT(info);
1917
1918         if (argc != 2) {
1919                 usage();
1920         }
1921
1922         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1923                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1924                                   argv[0]));
1925                 talloc_free(tmp_ctx);
1926                 return -1;
1927         }
1928         strcpy(info.name, argv[0]);
1929
1930         if (strcmp(argv[1], "up") == 0) {
1931                 info.link_state = 1;
1932         } else if (strcmp(argv[1], "down") == 0) {
1933                 info.link_state = 0;
1934         } else {
1935                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1936                                   argv[1]));
1937                 talloc_free(tmp_ctx);
1938                 return -1;
1939         }
1940
1941         /* read the public ip list from this node */
1942         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1943                                    tmp_ctx, &info);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1946                                   argv[0], options.pnn));
1947                 talloc_free(tmp_ctx);
1948                 return ret;
1949         }
1950
1951         talloc_free(tmp_ctx);
1952         return 0;
1953 }
1954
1955 /*
1956   display pid of a ctdb daemon
1957  */
1958 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1959 {
1960         uint32_t pid;
1961         int ret;
1962
1963         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1964         if (ret != 0) {
1965                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1966                 return ret;
1967         }
1968         printf("Pid:%d\n", pid);
1969
1970         return 0;
1971 }
1972
1973 static uint32_t ipreallocate_finished;
1974
1975 /*
1976   handler for receiving the response to ipreallocate
1977 */
1978 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1979                              TDB_DATA data, void *private_data)
1980 {
1981         ipreallocate_finished = 1;
1982 }
1983
1984 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1985 {
1986         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1987
1988         event_add_timed(ctdb->ev, ctdb, 
1989                                 timeval_current_ofs(1, 0),
1990                                 ctdb_every_second, ctdb);
1991 }
1992
1993 /*
1994   ask the recovery daemon on the recovery master to perform a ip reallocation
1995  */
1996 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1997 {
1998         int i, ret;
1999         TDB_DATA data;
2000         struct takeover_run_reply rd;
2001         uint32_t recmaster;
2002         struct ctdb_node_map *nodemap=NULL;
2003         int retries=0;
2004         struct timeval tv = timeval_current();
2005
2006         /* we need some events to trigger so we can timeout and restart
2007            the loop
2008         */
2009         event_add_timed(ctdb->ev, ctdb, 
2010                                 timeval_current_ofs(1, 0),
2011                                 ctdb_every_second, ctdb);
2012
2013         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2014         if (rd.pnn == -1) {
2015                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
2016                 return -1;
2017         }
2018         rd.srvid = getpid();
2019
2020         /* register a message port for receiveing the reply so that we
2021            can receive the reply
2022         */
2023         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2024
2025         data.dptr = (uint8_t *)&rd;
2026         data.dsize = sizeof(rd);
2027
2028 again:
2029         /* check that there are valid nodes available */
2030         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2031                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2032                 return -1;
2033         }
2034         for (i=0; i<nodemap->num;i++) {
2035                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2036                         break;
2037                 }
2038         }
2039         if (i==nodemap->num) {
2040                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2041                 return 0;
2042         }
2043
2044
2045         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2048                 return ret;
2049         }
2050
2051         /* verify the node exists */
2052         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2053                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2054                 return -1;
2055         }
2056
2057
2058         /* check tha there are nodes available that can act as a recmaster */
2059         for (i=0; i<nodemap->num; i++) {
2060                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2061                         continue;
2062                 }
2063                 break;
2064         }
2065         if (i == nodemap->num) {
2066                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2067                 return 0;
2068         }
2069
2070         /* verify the recovery master is not STOPPED, nor BANNED */
2071         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2072                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2073                 retries++;
2074                 sleep(1);
2075                 goto again;
2076         } 
2077
2078         
2079         /* verify the recovery master is not STOPPED, nor BANNED */
2080         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2081                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2082                 retries++;
2083                 sleep(1);
2084                 goto again;
2085         } 
2086
2087         ipreallocate_finished = 0;
2088         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2091                 return -1;
2092         }
2093
2094         tv = timeval_current();
2095         /* this loop will terminate when we have received the reply */
2096         while (timeval_elapsed(&tv) < 3.0) {
2097                 event_loop_once(ctdb->ev);
2098         }
2099         if (ipreallocate_finished == 1) {
2100                 return 0;
2101         }
2102
2103         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2104         retries++;
2105         sleep(1);
2106         goto again;
2107
2108         return 0;
2109 }
2110
2111
2112 /*
2113   disable a remote node
2114  */
2115 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2116 {
2117         int ret;
2118         struct ctdb_node_map *nodemap=NULL;
2119
2120         /* check if the node is already disabled */
2121         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2122                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2123                 exit(10);
2124         }
2125         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2126                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2127                 return 0;
2128         }
2129
2130         do {
2131                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2132                 if (ret != 0) {
2133                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2134                         return ret;
2135                 }
2136
2137                 sleep(1);
2138
2139                 /* read the nodemap and verify the change took effect */
2140                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2141                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2142                         exit(10);
2143                 }
2144
2145         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2146         ret = control_ipreallocate(ctdb, argc, argv);
2147         if (ret != 0) {
2148                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2149                 return ret;
2150         }
2151
2152         return 0;
2153 }
2154
2155 /*
2156   enable a disabled remote node
2157  */
2158 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2159 {
2160         int ret;
2161
2162         struct ctdb_node_map *nodemap=NULL;
2163
2164
2165         /* check if the node is already enabled */
2166         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2167                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2168                 exit(10);
2169         }
2170         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2171                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2172                 return 0;
2173         }
2174
2175         do {
2176                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2177                 if (ret != 0) {
2178                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2179                         return ret;
2180                 }
2181
2182                 sleep(1);
2183
2184                 /* read the nodemap and verify the change took effect */
2185                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2186                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2187                         exit(10);
2188                 }
2189
2190         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2191
2192         ret = control_ipreallocate(ctdb, argc, argv);
2193         if (ret != 0) {
2194                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2195                 return ret;
2196         }
2197
2198         return 0;
2199 }
2200
2201 /*
2202   stop a remote node
2203  */
2204 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2205 {
2206         int ret;
2207         struct ctdb_node_map *nodemap=NULL;
2208
2209         do {
2210                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2211                 if (ret != 0) {
2212                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2213                 }
2214         
2215                 sleep(1);
2216
2217                 /* read the nodemap and verify the change took effect */
2218                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2219                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2220                         exit(10);
2221                 }
2222
2223         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2224         ret = control_ipreallocate(ctdb, argc, argv);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2227                 return ret;
2228         }
2229
2230         return 0;
2231 }
2232
2233 /*
2234   restart a stopped remote node
2235  */
2236 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2237 {
2238         int ret;
2239
2240         struct ctdb_node_map *nodemap=NULL;
2241
2242         do {
2243                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2244                 if (ret != 0) {
2245                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2246                         return ret;
2247                 }
2248         
2249                 sleep(1);
2250
2251                 /* read the nodemap and verify the change took effect */
2252                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2253                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2254                         exit(10);
2255                 }
2256
2257         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2258         ret = control_ipreallocate(ctdb, argc, argv);
2259         if (ret != 0) {
2260                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2261                 return ret;
2262         }
2263
2264         return 0;
2265 }
2266
2267 static uint32_t get_generation(struct ctdb_context *ctdb)
2268 {
2269         struct ctdb_vnn_map *vnnmap=NULL;
2270         int ret;
2271
2272         /* wait until the recmaster is not in recovery mode */
2273         while (1) {
2274                 uint32_t recmode, recmaster;
2275                 
2276                 if (vnnmap != NULL) {
2277                         talloc_free(vnnmap);
2278                         vnnmap = NULL;
2279                 }
2280
2281                 /* get the recmaster */
2282                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2283                 if (ret != 0) {
2284                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2285                         exit(10);
2286                 }
2287
2288                 /* get recovery mode */
2289                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2290                 if (ret != 0) {
2291                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2292                         exit(10);
2293                 }
2294
2295                 /* get the current generation number */
2296                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2297                 if (ret != 0) {
2298                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2299                         exit(10);
2300                 }
2301
2302                 if ((recmode == CTDB_RECOVERY_NORMAL)
2303                 &&  (vnnmap->generation != 1)){
2304                         return vnnmap->generation;
2305                 }
2306                 sleep(1);
2307         }
2308 }
2309
2310 /*
2311   ban a node from the cluster
2312  */
2313 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2314 {
2315         int ret;
2316         struct ctdb_node_map *nodemap=NULL;
2317         struct ctdb_ban_time bantime;
2318
2319         if (argc < 1) {
2320                 usage();
2321         }
2322         
2323         /* verify the node exists */
2324         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2325         if (ret != 0) {
2326                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2327                 return ret;
2328         }
2329
2330         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2331                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2332                 return -1;
2333         }
2334
2335         bantime.pnn  = options.pnn;
2336         bantime.time = strtoul(argv[0], NULL, 0);
2337
2338         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2339         if (ret != 0) {
2340                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2341                 return -1;
2342         }       
2343
2344         ret = control_ipreallocate(ctdb, argc, argv);
2345         if (ret != 0) {
2346                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2347                 return ret;
2348         }
2349
2350         return 0;
2351 }
2352
2353
2354 /*
2355   unban a node from the cluster
2356  */
2357 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2358 {
2359         int ret;
2360         struct ctdb_node_map *nodemap=NULL;
2361         struct ctdb_ban_time bantime;
2362
2363         /* verify the node exists */
2364         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2365         if (ret != 0) {
2366                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2367                 return ret;
2368         }
2369
2370         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2371                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2372                 return -1;
2373         }
2374
2375         bantime.pnn  = options.pnn;
2376         bantime.time = 0;
2377
2378         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2379         if (ret != 0) {
2380                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2381                 return -1;
2382         }       
2383
2384         ret = control_ipreallocate(ctdb, argc, argv);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2387                 return ret;
2388         }
2389
2390         return 0;
2391 }
2392
2393
2394 /*
2395   show ban information for a node
2396  */
2397 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2398 {
2399         int ret;
2400         struct ctdb_node_map *nodemap=NULL;
2401         struct ctdb_ban_time *bantime;
2402
2403         /* verify the node exists */
2404         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2405         if (ret != 0) {
2406                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2407                 return ret;
2408         }
2409
2410         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2411         if (ret != 0) {
2412                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2413                 return -1;
2414         }       
2415
2416         if (bantime->time == 0) {
2417                 printf("Node %u is not banned\n", bantime->pnn);
2418         } else {
2419                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2420         }
2421
2422         return 0;
2423 }
2424
2425 /*
2426   shutdown a daemon
2427  */
2428 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2429 {
2430         int ret;
2431
2432         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2433         if (ret != 0) {
2434                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2435                 return ret;
2436         }
2437
2438         return 0;
2439 }
2440
2441 /*
2442   trigger a recovery
2443  */
2444 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2445 {
2446         int ret;
2447         uint32_t generation, next_generation;
2448
2449         /* record the current generation number */
2450         generation = get_generation(ctdb);
2451
2452         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2453         if (ret != 0) {
2454                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2455                 return ret;
2456         }
2457
2458         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2459         if (ret != 0) {
2460                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2461                 return ret;
2462         }
2463
2464         /* wait until we are in a new generation */
2465         while (1) {
2466                 next_generation = get_generation(ctdb);
2467                 if (next_generation != generation) {
2468                         return 0;
2469                 }
2470                 sleep(1);
2471         }
2472
2473         return 0;
2474 }
2475
2476
2477 /*
2478   display monitoring mode of a remote node
2479  */
2480 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2481 {
2482         uint32_t monmode;
2483         int ret;
2484
2485         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2486         if (ret != 0) {
2487                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2488                 return ret;
2489         }
2490         if (!options.machinereadable){
2491                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2492         } else {
2493                 printf(":mode:\n");
2494                 printf(":%d:\n",monmode);
2495         }
2496         return 0;
2497 }
2498
2499
2500 /*
2501   display capabilities of a remote node
2502  */
2503 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2504 {
2505         uint32_t capabilities;
2506         int ret;
2507
2508         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2511                 return ret;
2512         }
2513         
2514         if (!options.machinereadable){
2515                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2516                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2517                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2518                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2519         } else {
2520                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2521                 printf(":%d:%d:%d:%d:\n",
2522                         !!(capabilities&CTDB_CAP_RECMASTER),
2523                         !!(capabilities&CTDB_CAP_LMASTER),
2524                         !!(capabilities&CTDB_CAP_LVS),
2525                         !!(capabilities&CTDB_CAP_NATGW));
2526         }
2527         return 0;
2528 }
2529
2530 /*
2531   display lvs configuration
2532  */
2533 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2534 {
2535         uint32_t *capabilities;
2536         struct ctdb_node_map *nodemap=NULL;
2537         int i, ret;
2538         int healthy_count = 0;
2539
2540         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2543                 return ret;
2544         }
2545
2546         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2547         CTDB_NO_MEMORY(ctdb, capabilities);
2548         
2549         /* collect capabilities for all connected nodes */
2550         for (i=0; i<nodemap->num; i++) {
2551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2552                         continue;
2553                 }
2554                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2555                         continue;
2556                 }
2557         
2558                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2559                 if (ret != 0) {
2560                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2561                         return ret;
2562                 }
2563
2564                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2565                         continue;
2566                 }
2567
2568                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2569                         healthy_count++;
2570                 }
2571         }
2572
2573         /* Print all LVS nodes */
2574         for (i=0; i<nodemap->num; i++) {
2575                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2576                         continue;
2577                 }
2578                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2579                         continue;
2580                 }
2581                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2582                         continue;
2583                 }
2584
2585                 if (healthy_count != 0) {
2586                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2587                                 continue;
2588                         }
2589                 }
2590
2591                 printf("%d:%s\n", i, 
2592                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2593         }
2594
2595         return 0;
2596 }
2597
2598 /*
2599   display who is the lvs master
2600  */
2601 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2602 {
2603         uint32_t *capabilities;
2604         struct ctdb_node_map *nodemap=NULL;
2605         int i, ret;
2606         int healthy_count = 0;
2607
2608         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2609         if (ret != 0) {
2610                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2611                 return ret;
2612         }
2613
2614         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2615         CTDB_NO_MEMORY(ctdb, capabilities);
2616         
2617         /* collect capabilities for all connected nodes */
2618         for (i=0; i<nodemap->num; i++) {
2619                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2620                         continue;
2621                 }
2622                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2623                         continue;
2624                 }
2625         
2626                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2627                 if (ret != 0) {
2628                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2629                         return ret;
2630                 }
2631
2632                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2633                         continue;
2634                 }
2635
2636                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2637                         healthy_count++;
2638                 }
2639         }
2640
2641         /* find and show the lvsmaster */
2642         for (i=0; i<nodemap->num; i++) {
2643                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2644                         continue;
2645                 }
2646                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2647                         continue;
2648                 }
2649                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2650                         continue;
2651                 }
2652
2653                 if (healthy_count != 0) {
2654                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2655                                 continue;
2656                         }
2657                 }
2658
2659                 if (options.machinereadable){
2660                         printf("%d\n", i);
2661                 } else {
2662                         printf("Node %d is LVS master\n", i);
2663                 }
2664                 return 0;
2665         }
2666
2667         printf("There is no LVS master\n");
2668         return -1;
2669 }
2670
2671 /*
2672   disable monitoring on a  node
2673  */
2674 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2675 {
2676         
2677         int ret;
2678
2679         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2680         if (ret != 0) {
2681                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2682                 return ret;
2683         }
2684         printf("Monitoring mode:%s\n","DISABLED");
2685
2686         return 0;
2687 }
2688
2689 /*
2690   enable monitoring on a  node
2691  */
2692 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2693 {
2694         
2695         int ret;
2696
2697         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2698         if (ret != 0) {
2699                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2700                 return ret;
2701         }
2702         printf("Monitoring mode:%s\n","ACTIVE");
2703
2704         return 0;
2705 }
2706
2707 /*
2708   display remote list of keys/data for a db
2709  */
2710 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2711 {
2712         const char *db_name;
2713         struct ctdb_db_context *ctdb_db;
2714         int ret;
2715
2716         if (argc < 1) {
2717                 usage();
2718         }
2719
2720         db_name = argv[0];
2721
2722
2723         if (db_exists(ctdb, db_name)) {
2724                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2725                 return -1;
2726         }
2727
2728         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2729
2730         if (ctdb_db == NULL) {
2731                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2732                 return -1;
2733         }
2734
2735         /* traverse and dump the cluster tdb */
2736         ret = ctdb_dump_db(ctdb_db, stdout);
2737         if (ret == -1) {
2738                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2739                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2740                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2741                                   db_name));
2742                 return -1;
2743         }
2744         talloc_free(ctdb_db);
2745
2746         printf("Dumped %d records\n", ret);
2747         return 0;
2748 }
2749
2750
2751 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2752                              TDB_DATA data, void *private_data)
2753 {
2754         DEBUG(DEBUG_ERR,("Log data received\n"));
2755         if (data.dsize > 0) {
2756                 printf("%s", data.dptr);
2757         }
2758
2759         exit(0);
2760 }
2761
2762 /*
2763   display a list of log messages from the in memory ringbuffer
2764  */
2765 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2766 {
2767         int ret;
2768         int32_t res;
2769         struct ctdb_get_log_addr log_addr;
2770         TDB_DATA data;
2771         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2772         char *errmsg;
2773         struct timeval tv;
2774
2775         if (argc != 1) {
2776                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2777                 talloc_free(tmp_ctx);
2778                 return -1;
2779         }
2780
2781         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2782         log_addr.srvid = getpid();
2783         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2784                 log_addr.level = get_debug_by_desc(argv[0]);
2785         } else {
2786                 log_addr.level = strtol(argv[0], NULL, 0);
2787         }
2788
2789
2790         data.dptr = (unsigned char *)&log_addr;
2791         data.dsize = sizeof(log_addr);
2792
2793         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2794
2795         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2796         sleep(1);
2797
2798         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2799
2800         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2801                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2802         if (ret != 0 || res != 0) {
2803                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2804                 talloc_free(tmp_ctx);
2805                 return -1;
2806         }
2807
2808
2809         tv = timeval_current();
2810         /* this loop will terminate when we have received the reply */
2811         while (timeval_elapsed(&tv) < 3.0) {    
2812                 event_loop_once(ctdb->ev);
2813         }
2814
2815         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2816
2817         talloc_free(tmp_ctx);
2818         return 0;
2819 }
2820
2821 /*
2822   clear the in memory log area
2823  */
2824 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2825 {
2826         int ret;
2827         int32_t res;
2828         char *errmsg;
2829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2830
2831         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2832                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2833         if (ret != 0 || res != 0) {
2834                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2835                 talloc_free(tmp_ctx);
2836                 return -1;
2837         }
2838
2839         talloc_free(tmp_ctx);
2840         return 0;
2841 }
2842
2843
2844
2845 /*
2846   display a list of the databases on a remote ctdb
2847  */
2848 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2849 {
2850         int i, ret;
2851         struct ctdb_dbid_map *dbmap=NULL;
2852
2853         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2854         if (ret != 0) {
2855                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2856                 return ret;
2857         }
2858
2859         if(options.machinereadable){
2860                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2861                 for(i=0;i<dbmap->num;i++){
2862                         const char *path;
2863                         const char *name;
2864                         const char *health;
2865                         bool persistent;
2866
2867                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2868                                             dbmap->dbs[i].dbid, ctdb, &path);
2869                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2870                                             dbmap->dbs[i].dbid, ctdb, &name);
2871                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2872                                               dbmap->dbs[i].dbid, ctdb, &health);
2873                         persistent = dbmap->dbs[i].persistent;
2874                         printf(":0x%08X:%s:%s:%d:%d:\n",
2875                                dbmap->dbs[i].dbid, name, path,
2876                                !!(persistent), !!(health));
2877                 }
2878                 return 0;
2879         }
2880
2881         printf("Number of databases:%d\n", dbmap->num);
2882         for(i=0;i<dbmap->num;i++){
2883                 const char *path;
2884                 const char *name;
2885                 const char *health;
2886                 bool persistent;
2887
2888                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2889                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2890                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2891                 persistent = dbmap->dbs[i].persistent;
2892                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2893                        dbmap->dbs[i].dbid, name, path,
2894                        persistent?" PERSISTENT":"",
2895                        health?" UNHEALTHY":"");
2896         }
2897
2898         return 0;
2899 }
2900
2901 /*
2902   display the status of a database on a remote ctdb
2903  */
2904 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2905 {
2906         int i, ret;
2907         struct ctdb_dbid_map *dbmap=NULL;
2908         const char *db_name;
2909
2910         if (argc < 1) {
2911                 usage();
2912         }
2913
2914         db_name = argv[0];
2915
2916         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2917         if (ret != 0) {
2918                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2919                 return ret;
2920         }
2921
2922         for(i=0;i<dbmap->num;i++){
2923                 const char *path;
2924                 const char *name;
2925                 const char *health;
2926                 bool persistent;
2927
2928                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2929                 if (strcmp(name, db_name) != 0) {
2930                         continue;
2931                 }
2932
2933                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2934                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2935                 persistent = dbmap->dbs[i].persistent;
2936                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2937                        dbmap->dbs[i].dbid, name, path,
2938                        persistent?"yes":"no",
2939                        health?health:"OK");
2940                 return 0;
2941         }
2942
2943         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2944         return 0;
2945 }
2946
2947 /*
2948   check if the local node is recmaster or not
2949   it will return 1 if this node is the recmaster and 0 if it is not
2950   or if the local ctdb daemon could not be contacted
2951  */
2952 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2953 {
2954         uint32_t mypnn, recmaster;
2955         int ret;
2956
2957         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2958         if (mypnn == -1) {
2959                 printf("Failed to get pnn of node\n");
2960                 return 1;
2961         }
2962
2963         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2964         if (ret != 0) {
2965                 printf("Failed to get the recmaster\n");
2966                 return 1;
2967         }
2968
2969         if (recmaster != mypnn) {
2970                 printf("this node is not the recmaster\n");
2971                 return 1;
2972         }
2973
2974         printf("this node is the recmaster\n");
2975         return 0;
2976 }
2977
2978 /*
2979   ping a node
2980  */
2981 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2982 {
2983         int ret;
2984         struct timeval tv = timeval_current();
2985         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2986         if (ret == -1) {
2987                 printf("Unable to get ping response from node %u\n", options.pnn);
2988                 return -1;
2989         } else {
2990                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2991                        options.pnn, timeval_elapsed(&tv), ret);
2992         }
2993         return 0;
2994 }
2995
2996
2997 /*
2998   get a tunable
2999  */
3000 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3001 {
3002         const char *name;
3003         uint32_t value;
3004         int ret;
3005
3006         if (argc < 1) {
3007                 usage();
3008         }
3009
3010         name = argv[0];
3011         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
3012         if (ret == -1) {
3013                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3014                 return -1;
3015         }
3016
3017         printf("%-19s = %u\n", name, value);
3018         return 0;
3019 }
3020
3021 /*
3022   set a tunable
3023  */
3024 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3025 {
3026         const char *name;
3027         uint32_t value;
3028         int ret;
3029
3030         if (argc < 2) {
3031                 usage();
3032         }
3033
3034         name = argv[0];
3035         value = strtoul(argv[1], NULL, 0);
3036
3037         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3038         if (ret == -1) {
3039                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3040                 return -1;
3041         }
3042         return 0;
3043 }
3044
3045 /*
3046   list all tunables
3047  */
3048 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3049 {
3050         uint32_t count;
3051         const char **list;
3052         int ret, i;
3053
3054         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3055         if (ret == -1) {
3056                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3057                 return -1;
3058         }
3059
3060         for (i=0;i<count;i++) {
3061                 control_getvar(ctdb, 1, &list[i]);
3062         }
3063
3064         talloc_free(list);
3065         
3066         return 0;
3067 }
3068
3069 /*
3070   display debug level on a node
3071  */
3072 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3073 {
3074         int ret;
3075         int32_t level;
3076
3077         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3078         if (ret != 0) {
3079                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3080                 return ret;
3081         } else {
3082                 if (options.machinereadable){
3083                         printf(":Name:Level:\n");
3084                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3085                 } else {
3086                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3087                 }
3088         }
3089         return 0;
3090 }
3091
3092 /*
3093   display reclock file of a node
3094  */
3095 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3096 {
3097         int ret;
3098         const char *reclock;
3099
3100         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3101         if (ret != 0) {
3102                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3103                 return ret;
3104         } else {
3105                 if (options.machinereadable){
3106                         if (reclock != NULL) {
3107                                 printf("%s", reclock);
3108                         }
3109                 } else {
3110                         if (reclock == NULL) {
3111                                 printf("No reclock file used.\n");
3112                         } else {
3113                                 printf("Reclock file:%s\n", reclock);
3114                         }
3115                 }
3116         }
3117         return 0;
3118 }
3119
3120 /*
3121   set the reclock file of a node
3122  */
3123 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3124 {
3125         int ret;
3126         const char *reclock;
3127
3128         if (argc == 0) {
3129                 reclock = NULL;
3130         } else if (argc == 1) {
3131                 reclock = argv[0];
3132         } else {
3133                 usage();
3134         }
3135
3136         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3137         if (ret != 0) {
3138                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3139                 return ret;
3140         }
3141         return 0;
3142 }
3143
3144 /*
3145   set the natgw state on/off
3146  */
3147 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3148 {
3149         int ret;
3150         uint32_t natgwstate;
3151
3152         if (argc == 0) {
3153                 usage();
3154         }
3155
3156         if (!strcmp(argv[0], "on")) {
3157                 natgwstate = 1;
3158         } else if (!strcmp(argv[0], "off")) {
3159                 natgwstate = 0;
3160         } else {
3161                 usage();
3162         }
3163
3164         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3165         if (ret != 0) {
3166                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3167                 return ret;
3168         }
3169
3170         return 0;
3171 }
3172
3173 /*
3174   set the lmaster role on/off
3175  */
3176 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3177 {
3178         int ret;
3179         uint32_t lmasterrole;
3180
3181         if (argc == 0) {
3182                 usage();
3183         }
3184
3185         if (!strcmp(argv[0], "on")) {
3186                 lmasterrole = 1;
3187         } else if (!strcmp(argv[0], "off")) {
3188                 lmasterrole = 0;
3189         } else {
3190                 usage();
3191         }
3192
3193         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3194         if (ret != 0) {
3195                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3196                 return ret;
3197         }
3198
3199         return 0;
3200 }
3201
3202 /*
3203   set the recmaster role on/off
3204  */
3205 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3206 {
3207         int ret;
3208         uint32_t recmasterrole;
3209
3210         if (argc == 0) {
3211                 usage();
3212         }
3213
3214         if (!strcmp(argv[0], "on")) {
3215                 recmasterrole = 1;
3216         } else if (!strcmp(argv[0], "off")) {
3217                 recmasterrole = 0;
3218         } else {
3219                 usage();
3220         }
3221
3222         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3223         if (ret != 0) {
3224                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3225                 return ret;
3226         }
3227
3228         return 0;
3229 }
3230
3231 /*
3232   set debug level on a node or all nodes
3233  */
3234 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3235 {
3236         int i, ret;
3237         int32_t level;
3238
3239         if (argc == 0) {
3240                 printf("You must specify the debug level. Valid levels are:\n");
3241                 for (i=0; debug_levels[i].description != NULL; i++) {
3242                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3243                 }
3244
3245                 return 0;
3246         }
3247
3248         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3249                 level = get_debug_by_desc(argv[0]);
3250         } else {
3251                 level = strtol(argv[0], NULL, 0);
3252         }
3253
3254         for (i=0; debug_levels[i].description != NULL; i++) {
3255                 if (level == debug_levels[i].level) {
3256                         break;
3257                 }
3258         }
3259         if (debug_levels[i].description == NULL) {
3260                 printf("Invalid debug level, must be one of\n");
3261                 for (i=0; debug_levels[i].description != NULL; i++) {
3262                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3263                 }
3264                 return -1;
3265         }
3266
3267         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3268         if (ret != 0) {
3269                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3270         }
3271         return 0;
3272 }
3273
3274
3275 /*
3276   freeze a node
3277  */
3278 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3279 {
3280         int ret;
3281         uint32_t priority;
3282         
3283         if (argc == 1) {
3284                 priority = strtol(argv[0], NULL, 0);
3285         } else {
3286                 priority = 0;
3287         }
3288         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3289
3290         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3291         if (ret != 0) {
3292                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3293         }               
3294         return 0;
3295 }
3296
3297 /*
3298   thaw a node
3299  */
3300 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3301 {
3302         int ret;
3303         uint32_t priority;
3304         
3305         if (argc == 1) {
3306                 priority = strtol(argv[0], NULL, 0);
3307         } else {
3308                 priority = 0;
3309         }
3310         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3311
3312         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3313         if (ret != 0) {
3314                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3315         }               
3316         return 0;
3317 }
3318
3319
3320 /*
3321   attach to a database
3322  */
3323 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3324 {
3325         const char *db_name;
3326         struct ctdb_db_context *ctdb_db;
3327
3328         if (argc < 1) {
3329                 usage();
3330         }
3331         db_name = argv[0];
3332
3333         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3334         if (ctdb_db == NULL) {
3335                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3336                 return -1;
3337         }
3338
3339         return 0;
3340 }
3341
3342 /*
3343   set db priority
3344  */
3345 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3346 {
3347         struct ctdb_db_priority db_prio;
3348         int ret;
3349
3350         if (argc < 2) {
3351                 usage();
3352         }
3353
3354         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3355         db_prio.priority = strtoul(argv[1], NULL, 0);
3356
3357         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3358         if (ret != 0) {
3359                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3360                 return -1;
3361         }
3362
3363         return 0;
3364 }
3365
3366 /*
3367   get db priority
3368  */
3369 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3370 {
3371         uint32_t db_id, priority;
3372         int ret;
3373
3374         if (argc < 1) {
3375                 usage();
3376         }
3377
3378         db_id = strtoul(argv[0], NULL, 0);
3379
3380         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3381         if (ret != 0) {
3382                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3383                 return -1;
3384         }
3385
3386         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3387
3388         return 0;
3389 }
3390
3391 /*
3392   run an eventscript on a node
3393  */
3394 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3395 {
3396         TDB_DATA data;
3397         int ret;
3398         int32_t res;
3399         char *errmsg;
3400         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3401
3402         if (argc != 1) {
3403                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3404                 return -1;
3405         }
3406
3407         data.dptr = (unsigned char *)discard_const(argv[0]);
3408         data.dsize = strlen((char *)data.dptr) + 1;
3409
3410         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3411
3412         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3413                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3414         if (ret != 0 || res != 0) {
3415                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3416                 talloc_free(tmp_ctx);
3417                 return -1;
3418         }
3419         talloc_free(tmp_ctx);
3420         return 0;
3421 }
3422
3423 #define DB_VERSION 1
3424 #define MAX_DB_NAME 64
3425 struct db_file_header {
3426         unsigned long version;
3427         time_t timestamp;
3428         unsigned long persistent;
3429         unsigned long size;
3430         const char name[MAX_DB_NAME];
3431 };
3432
3433 struct backup_data {
3434         struct ctdb_marshall_buffer *records;
3435         uint32_t len;
3436         uint32_t total;
3437         bool traverse_error;
3438 };
3439
3440 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3441 {
3442         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3443         struct ctdb_rec_data *rec;
3444
3445         /* add the record */
3446         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3447         if (rec == NULL) {
3448                 bd->traverse_error = true;
3449                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3450                 return -1;
3451         }
3452         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3453         if (bd->records == NULL) {
3454                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3455                 bd->traverse_error = true;
3456                 return -1;
3457         }
3458         bd->records->count++;
3459         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3460         bd->len += rec->length;
3461         talloc_free(rec);
3462
3463         bd->total++;
3464         return 0;
3465 }
3466
3467 /*
3468  * backup a database to a file 
3469  */
3470 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3471 {
3472         int i, ret;
3473         struct ctdb_dbid_map *dbmap=NULL;
3474         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3475         struct db_file_header dbhdr;
3476         struct ctdb_db_context *ctdb_db;
3477         struct backup_data *bd;
3478         int fh = -1;
3479         int status = -1;
3480         const char *reason = NULL;
3481
3482         if (argc != 2) {
3483                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3484                 return -1;
3485         }
3486
3487         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3488         if (ret != 0) {
3489                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3490                 return ret;
3491         }
3492
3493         for(i=0;i<dbmap->num;i++){
3494                 const char *name;
3495
3496                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3497                 if(!strcmp(argv[0], name)){
3498                         talloc_free(discard_const(name));
3499                         break;
3500                 }
3501                 talloc_free(discard_const(name));
3502         }
3503         if (i == dbmap->num) {
3504                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3505                 talloc_free(tmp_ctx);
3506                 return -1;
3507         }
3508
3509         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3510                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3511         if (ret != 0) {
3512                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3513                                  argv[0]));
3514                 talloc_free(tmp_ctx);
3515                 return -1;
3516         }
3517         if (reason) {
3518                 uint32_t allow_unhealthy = 0;
3519
3520                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3521                                       "AllowUnhealthyDBRead",
3522                                       &allow_unhealthy);
3523
3524                 if (allow_unhealthy != 1) {
3525                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3526                                          argv[0], reason));
3527
3528                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3529                                          allow_unhealthy));
3530                         talloc_free(tmp_ctx);
3531                         return -1;
3532                 }
3533
3534                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3535                                      argv[0], argv[0]));
3536                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3537                                      "tunnable AllowUnhealthyDBRead = %u\n",
3538                                      allow_unhealthy));
3539         }
3540
3541         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3542         if (ctdb_db == NULL) {
3543                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3544                 talloc_free(tmp_ctx);
3545                 return -1;
3546         }
3547
3548
3549         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3550         if (ret == -1) {
3551                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3552                 talloc_free(tmp_ctx);
3553                 return -1;
3554         }
3555
3556
3557         bd = talloc_zero(tmp_ctx, struct backup_data);
3558         if (bd == NULL) {
3559                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3560                 talloc_free(tmp_ctx);
3561                 return -1;
3562         }
3563
3564         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3565         if (bd->records == NULL) {
3566                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3567                 talloc_free(tmp_ctx);
3568                 return -1;
3569         }
3570
3571         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3572         bd->records->db_id = ctdb_db->db_id;
3573         /* traverse the database collecting all records */
3574         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3575             bd->traverse_error) {
3576                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3577                 talloc_free(tmp_ctx);
3578                 return -1;              
3579         }
3580
3581         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3582
3583
3584         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3585         if (fh == -1) {
3586                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3587                 talloc_free(tmp_ctx);
3588                 return -1;
3589         }
3590
3591         dbhdr.version = DB_VERSION;
3592         dbhdr.timestamp = time(NULL);
3593         dbhdr.persistent = dbmap->dbs[i].persistent;
3594         dbhdr.size = bd->len;
3595         if (strlen(argv[0]) >= MAX_DB_NAME) {
3596                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3597                 goto done;
3598         }
3599         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3600         ret = write(fh, &dbhdr, sizeof(dbhdr));
3601         if (ret == -1) {
3602                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3603                 goto done;
3604         }
3605         ret = write(fh, bd->records, bd->len);
3606         if (ret == -1) {
3607                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3608                 goto done;
3609         }
3610
3611         status = 0;
3612 done:
3613         if (fh != -1) {
3614                 ret = close(fh);
3615                 if (ret == -1) {
3616                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3617                 }
3618         }
3619         talloc_free(tmp_ctx);
3620         return status;
3621 }
3622
3623 /*
3624  * restore a database from a file 
3625  */
3626 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3627 {
3628         int ret;
3629         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3630         TDB_DATA outdata;
3631         TDB_DATA data;
3632         struct db_file_header dbhdr;
3633         struct ctdb_db_context *ctdb_db;
3634         struct ctdb_node_map *nodemap=NULL;
3635         struct ctdb_vnn_map *vnnmap=NULL;
3636         int i, fh;
3637         struct ctdb_control_wipe_database w;
3638         uint32_t *nodes;
3639         uint32_t generation;
3640         struct tm *tm;
3641         char tbuf[100];
3642         char *dbname;
3643
3644         if (argc < 1 || argc > 2) {
3645                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3646                 return -1;
3647         }
3648
3649         fh = open(argv[0], O_RDONLY);
3650         if (fh == -1) {
3651                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3652                 talloc_free(tmp_ctx);
3653                 return -1;
3654         }
3655
3656         read(fh, &dbhdr, sizeof(dbhdr));
3657         if (dbhdr.version != DB_VERSION) {
3658                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3659                 talloc_free(tmp_ctx);
3660                 return -1;
3661         }
3662
3663         dbname = dbhdr.name;
3664         if (argc == 2) {
3665                 dbname = argv[1];
3666         }
3667
3668         outdata.dsize = dbhdr.size;
3669         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3670         if (outdata.dptr == NULL) {
3671                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3672                 close(fh);
3673                 talloc_free(tmp_ctx);
3674                 return -1;
3675         }               
3676         read(fh, outdata.dptr, outdata.dsize);
3677         close(fh);
3678
3679         tm = localtime(&dbhdr.timestamp);
3680         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3681         printf("Restoring database '%s' from backup @ %s\n",
3682                 dbname, tbuf);
3683
3684
3685         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3686         if (ctdb_db == NULL) {
3687                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3688                 talloc_free(tmp_ctx);
3689                 return -1;
3690         }
3691
3692         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3693         if (ret != 0) {
3694                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3695                 talloc_free(tmp_ctx);
3696                 return ret;
3697         }
3698
3699
3700         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3701         if (ret != 0) {
3702                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3703                 talloc_free(tmp_ctx);
3704                 return ret;
3705         }
3706
3707         /* freeze all nodes */
3708         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3709         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3710                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3711                                         nodes, i,
3712                                         TIMELIMIT(),
3713                                         false, tdb_null,
3714                                         NULL, NULL,
3715                                         NULL) != 0) {
3716                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3717                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3718                         talloc_free(tmp_ctx);
3719                         return -1;
3720                 }
3721         }
3722
3723         generation = vnnmap->generation;
3724         data.dptr = (void *)&generation;
3725         data.dsize = sizeof(generation);
3726
3727         /* start a cluster wide transaction */
3728         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3729         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3730                                         nodes, 0,
3731                                         TIMELIMIT(), false, data,
3732                                         NULL, NULL,
3733                                         NULL) != 0) {
3734                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3735                 return -1;
3736         }
3737
3738
3739         w.db_id = ctdb_db->db_id;
3740         w.transaction_id = generation;
3741
3742         data.dptr = (void *)&w;
3743         data.dsize = sizeof(w);
3744
3745         /* wipe all the remote databases. */
3746         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3747         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3748                                         nodes, 0,
3749                                         TIMELIMIT(), false, data,
3750                                         NULL, NULL,
3751                                         NULL) != 0) {
3752                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3753                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3754                 talloc_free(tmp_ctx);
3755                 return -1;
3756         }
3757         
3758         /* push the database */
3759         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3760         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3761                                         nodes, 0,
3762                                         TIMELIMIT(), false, outdata,
3763                                         NULL, NULL,
3764                                         NULL) != 0) {
3765                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3766                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3767                 talloc_free(tmp_ctx);
3768                 return -1;
3769         }
3770
3771         data.dptr = (void *)&ctdb_db->db_id;
3772         data.dsize = sizeof(ctdb_db->db_id);
3773
3774         /* mark the database as healthy */
3775         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3776         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3777                                         nodes, 0,
3778                                         TIMELIMIT(), false, data,
3779                                         NULL, NULL,
3780                                         NULL) != 0) {
3781                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3782                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3783                 talloc_free(tmp_ctx);
3784                 return -1;
3785         }
3786
3787         data.dptr = (void *)&generation;
3788         data.dsize = sizeof(generation);
3789
3790         /* commit all the changes */
3791         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3792                                         nodes, 0,
3793                                         TIMELIMIT(), false, data,
3794                                         NULL, NULL,
3795                                         NULL) != 0) {
3796                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3797                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3798                 talloc_free(tmp_ctx);
3799                 return -1;
3800         }
3801
3802
3803         /* thaw all nodes */
3804         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3805         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3806                                         nodes, 0,
3807                                         TIMELIMIT(),
3808                                         false, tdb_null,
3809                                         NULL, NULL,
3810                                         NULL) != 0) {
3811                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3812                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3813                 talloc_free(tmp_ctx);
3814                 return -1;
3815         }
3816
3817
3818         talloc_free(tmp_ctx);
3819         return 0;
3820 }
3821
3822 /*
3823  * dump a database backup from a file
3824  */
3825 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3826 {
3827         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3828         TDB_DATA outdata;
3829         struct db_file_header dbhdr;
3830         int i, fh;
3831         struct tm *tm;
3832         char tbuf[100];
3833         struct ctdb_rec_data *rec = NULL;
3834         struct ctdb_marshall_buffer *m;
3835
3836         if (argc != 1) {
3837                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3838                 return -1;
3839         }
3840
3841         fh = open(argv[0], O_RDONLY);
3842         if (fh == -1) {
3843                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3844                 talloc_free(tmp_ctx);
3845                 return -1;
3846         }
3847
3848         read(fh, &dbhdr, sizeof(dbhdr));
3849         if (dbhdr.version != DB_VERSION) {
3850                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3851                 talloc_free(tmp_ctx);
3852                 return -1;
3853         }
3854
3855         outdata.dsize = dbhdr.size;
3856         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3857         if (outdata.dptr == NULL) {
3858                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3859                 close(fh);
3860                 talloc_free(tmp_ctx);
3861                 return -1;
3862         }
3863         read(fh, outdata.dptr, outdata.dsize);
3864         close(fh);
3865         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3866
3867         tm = localtime(&dbhdr.timestamp);
3868         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3869         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3870                 dbhdr.name, m->db_id, tbuf);
3871
3872         for (i=0; i < m->count; i++) {
3873                 uint32_t reqid = 0;
3874                 TDB_DATA key, data;
3875
3876                 /* we do not want the header splitted, so we pass NULL*/
3877                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3878                                               NULL, &key, &data);
3879
3880                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3881         }
3882
3883         printf("Dumped %d records\n", i);
3884         talloc_free(tmp_ctx);
3885         return 0;
3886 }
3887
3888 /*
3889  * wipe a database from a file
3890  */
3891 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3892                           const char **argv)
3893 {
3894         int ret;
3895         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3896         TDB_DATA data;
3897         struct ctdb_db_context *ctdb_db;
3898         struct ctdb_node_map *nodemap = NULL;
3899         struct ctdb_vnn_map *vnnmap = NULL;
3900         int i;
3901         struct ctdb_control_wipe_database w;
3902         uint32_t *nodes;
3903         uint32_t generation;
3904         struct ctdb_dbid_map *dbmap = NULL;
3905
3906         if (argc != 1) {
3907                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3908                 return -1;
3909         }
3910
3911         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3912                                  &dbmap);
3913         if (ret != 0) {
3914                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3915                                   options.pnn));
3916                 return ret;
3917         }
3918
3919         for(i=0;i<dbmap->num;i++){
3920                 const char *name;
3921
3922                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3923                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3924                 if(!strcmp(argv[0], name)){
3925                         talloc_free(discard_const(name));
3926                         break;
3927                 }
3928                 talloc_free(discard_const(name));
3929         }
3930         if (i == dbmap->num) {
3931                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3932                                   argv[0]));
3933                 talloc_free(tmp_ctx);
3934                 return -1;
3935         }
3936
3937         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3938         if (ctdb_db == NULL) {
3939                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3940                                   argv[0]));
3941                 talloc_free(tmp_ctx);
3942                 return -1;
3943         }
3944
3945         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3946                                    &nodemap);
3947         if (ret != 0) {
3948                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3949                                   options.pnn));
3950                 talloc_free(tmp_ctx);
3951                 return ret;
3952         }
3953
3954         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3955                                   &vnnmap);
3956         if (ret != 0) {
3957                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3958                                   options.pnn));
3959                 talloc_free(tmp_ctx);
3960                 return ret;
3961         }
3962
3963         /* freeze all nodes */
3964         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3965         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3966                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3967                                                 nodes, i,
3968                                                 TIMELIMIT(),
3969                                                 false, tdb_null,
3970                                                 NULL, NULL,
3971                                                 NULL);
3972                 if (ret != 0) {
3973                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3974                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3975                                              CTDB_RECOVERY_ACTIVE);
3976                         talloc_free(tmp_ctx);
3977                         return -1;
3978                 }
3979         }
3980
3981         generation = vnnmap->generation;
3982         data.dptr = (void *)&generation;
3983         data.dsize = sizeof(generation);
3984
3985         /* start a cluster wide transaction */
3986         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3987         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3988                                         nodes, 0,
3989                                         TIMELIMIT(), false, data,
3990                                         NULL, NULL,
3991                                         NULL);
3992         if (ret!= 0) {
3993                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3994                                   "transactions.\n"));
3995                 return -1;
3996         }
3997
3998         w.db_id = ctdb_db->db_id;
3999         w.transaction_id = generation;
4000
4001         data.dptr = (void *)&w;
4002         data.dsize = sizeof(w);
4003
4004         /* wipe all the remote databases. */
4005         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4006         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4007                                         nodes, 0,
4008                                         TIMELIMIT(), false, data,
4009                                         NULL, NULL,
4010                                         NULL) != 0) {
4011                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4012                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4013                 talloc_free(tmp_ctx);
4014                 return -1;
4015         }
4016
4017         data.dptr = (void *)&ctdb_db->db_id;
4018         data.dsize = sizeof(ctdb_db->db_id);
4019
4020         /* mark the database as healthy */
4021         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4022         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4023                                         nodes, 0,
4024                                         TIMELIMIT(), false, data,
4025                                         NULL, NULL,
4026                                         NULL) != 0) {
4027                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4028                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4029                 talloc_free(tmp_ctx);
4030                 return -1;
4031         }
4032
4033         data.dptr = (void *)&generation;
4034         data.dsize = sizeof(generation);
4035
4036         /* commit all the changes */
4037         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4038                                         nodes, 0,
4039                                         TIMELIMIT(), false, data,
4040                                         NULL, NULL,
4041                                         NULL) != 0) {
4042                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4043                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4044                 talloc_free(tmp_ctx);
4045                 return -1;
4046         }
4047
4048         /* thaw all nodes */
4049         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4050         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4051                                         nodes, 0,
4052                                         TIMELIMIT(),
4053                                         false, tdb_null,
4054                                         NULL, NULL,
4055                                         NULL) != 0) {
4056                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4057                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4058                 talloc_free(tmp_ctx);
4059                 return -1;
4060         }
4061
4062         talloc_free(tmp_ctx);
4063         return 0;
4064 }
4065
4066 /*
4067  * set flags of a node in the nodemap
4068  */
4069 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4070 {
4071         int ret;
4072         int32_t status;
4073         int node;
4074         int flags;
4075         TDB_DATA data;
4076         struct ctdb_node_flag_change c;
4077
4078         if (argc != 2) {
4079                 usage();
4080                 return -1;
4081         }
4082
4083         if (sscanf(argv[0], "%d", &node) != 1) {
4084                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4085                 usage();
4086                 return -1;
4087         }
4088         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4089                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4090                 usage();
4091                 return -1;
4092         }
4093
4094         c.pnn       = node;
4095         c.old_flags = 0;
4096         c.new_flags = flags;
4097
4098         data.dsize = sizeof(c);
4099         data.dptr = (unsigned char *)&c;
4100
4101         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4102                            data, NULL, NULL, &status, NULL, NULL);
4103         if (ret != 0 || status != 0) {
4104                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4105                 return -1;
4106         }
4107         return 0;
4108 }
4109
4110 /*
4111   dump memory usage
4112  */
4113 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4114 {
4115         TDB_DATA data;
4116         int ret;
4117         int32_t res;
4118         char *errmsg;
4119         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4120         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4121                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4122         if (ret != 0 || res != 0) {
4123                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4124                 talloc_free(tmp_ctx);
4125                 return -1;
4126         }
4127         write(1, data.dptr, data.dsize);
4128         talloc_free(tmp_ctx);
4129         return 0;
4130 }
4131
4132 /*
4133   handler for memory dumps
4134 */
4135 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4136                              TDB_DATA data, void *private_data)
4137 {
4138         write(1, data.dptr, data.dsize);
4139         exit(0);
4140 }
4141
4142 /*
4143   dump memory usage on the recovery daemon
4144  */
4145 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4146 {
4147         int ret;
4148         TDB_DATA data;
4149         struct rd_memdump_reply rd;
4150
4151         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4152         if (rd.pnn == -1) {
4153                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4154                 return -1;
4155         }
4156         rd.srvid = getpid();
4157
4158         /* register a message port for receiveing the reply so that we
4159            can receive the reply
4160         */
4161         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4162
4163
4164         data.dptr = (uint8_t *)&rd;
4165         data.dsize = sizeof(rd);
4166
4167         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4168         if (ret != 0) {
4169                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4170                 return -1;
4171         }
4172
4173         /* this loop will terminate when we have received the reply */
4174         while (1) {     
4175                 event_loop_once(ctdb->ev);
4176         }
4177
4178         return 0;
4179 }
4180
4181 /*
4182   send a message to a srvid
4183  */
4184 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4185 {
4186         unsigned long srvid;
4187         int ret;
4188         TDB_DATA data;
4189
4190         if (argc < 2) {
4191                 usage();
4192         }
4193
4194         srvid      = strtoul(argv[0], NULL, 0);
4195
4196         data.dptr = (uint8_t *)discard_const(argv[1]);
4197         data.dsize= strlen(argv[1]);
4198
4199         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4200         if (ret != 0) {
4201                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4202                 return -1;
4203         }
4204
4205         return 0;
4206 }
4207
4208 /*
4209   handler for msglisten
4210 */
4211 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4212                              TDB_DATA data, void *private_data)
4213 {
4214         int i;
4215
4216         printf("Message received: ");
4217         for (i=0;i<data.dsize;i++) {
4218                 printf("%c", data.dptr[i]);
4219         }
4220         printf("\n");
4221 }
4222
4223 /*
4224   listen for messages on a messageport
4225  */
4226 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4227 {
4228         uint64_t srvid;
4229
4230         srvid = getpid();
4231
4232         /* register a message port and listen for messages
4233         */
4234         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4235         printf("Listening for messages on srvid:%d\n", (int)srvid);
4236
4237         while (1) {     
4238                 event_loop_once(ctdb->ev);
4239         }
4240
4241         return 0;
4242 }
4243
4244 /*
4245   list all nodes in the cluster
4246   if the daemon is running, we read the data from the daemon.
4247   if the daemon is not running we parse the nodes file directly
4248  */
4249 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4250 {
4251         int i, ret;
4252         struct ctdb_node_map *nodemap=NULL;
4253
4254         if (ctdb != NULL) {
4255                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4256                 if (ret != 0) {
4257                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4258                         return ret;
4259                 }
4260
4261                 for(i=0;i<nodemap->num;i++){
4262                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4263                                 continue;
4264                         }
4265                         if (options.machinereadable){
4266                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4267                         } else {
4268                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4269                         }
4270                 }
4271         } else {
4272                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4273                 struct pnn_node *pnn_nodes;
4274                 struct pnn_node *pnn_node;
4275         
4276                 pnn_nodes = read_nodes_file(mem_ctx);
4277                 if (pnn_nodes == NULL) {
4278                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4279                         talloc_free(mem_ctx);
4280                         return -1;
4281                 }
4282
4283                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4284                         ctdb_sock_addr addr;
4285
4286                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4287                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4288                                 talloc_free(mem_ctx);
4289                                 return -1;
4290                         }
4291
4292                         if (options.machinereadable){
4293                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4294                         } else {
4295                                 printf("%s\n", pnn_node->addr);
4296                         }
4297                 }
4298                 talloc_free(mem_ctx);
4299         }
4300
4301         return 0;
4302 }
4303
4304 /*
4305   reload the nodes file on the local node
4306  */
4307 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4308 {
4309         int i, ret;
4310         int mypnn;
4311         struct ctdb_node_map *nodemap=NULL;
4312
4313         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4314         if (mypnn == -1) {
4315                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4316                 return -1;
4317         }
4318
4319         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4320         if (ret != 0) {
4321                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4322                 return ret;
4323         }
4324
4325         /* reload the nodes file on all remote nodes */
4326         for (i=0;i<nodemap->num;i++) {
4327                 if (nodemap->nodes[i].pnn == mypnn) {
4328                         continue;
4329                 }
4330                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4331                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4332                         nodemap->nodes[i].pnn);
4333                 if (ret != 0) {
4334                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4335                 }
4336         }
4337
4338         /* reload the nodes file on the local node */
4339         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4340         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4341         if (ret != 0) {
4342                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4343         }
4344
4345         /* initiate a recovery */
4346         control_recover(ctdb, argc, argv);
4347
4348         return 0;
4349 }
4350
4351
4352 static const struct {
4353         const char *name;
4354         int (*fn)(struct ctdb_context *, int, const char **);
4355         bool auto_all;
4356         bool without_daemon; /* can be run without daemon running ? */
4357         const char *msg;
4358         const char *args;
4359 } ctdb_commands[] = {
4360 #ifdef CTDB_VERS
4361         { "version",         control_version,           true,   false,  "show version of ctdb" },
4362 #endif
4363         { "status",          control_status,            true,   false,  "show node status" },
4364         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4365         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4366         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4367         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4368         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4369         { "statistics",      control_statistics,        false,  false, "show statistics" },
4370         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4371         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4372         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4373         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4374         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4375         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4376         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4377         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4378         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4379         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4380         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4381         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4382         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4383         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4384         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4385         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4386         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4387         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4388         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4389         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4390         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4391         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4392         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4393         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4394         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4395         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4396         { "stop",            control_stop,              true,   false,  "stop a node" },
4397         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4398         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4399         { "unban",           control_unban,             true,   false,  "unban a node" },
4400         { "showban",         control_showban,           true,   false,  "show ban information"},
4401         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4402         { "recover",         control_recover,           true,   false,  "force recovery" },
4403         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4404         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4405         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4406         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4407         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4408         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4409         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4410         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4411
4412         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4413         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4414         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4415         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4416         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4417         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4418         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4419         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4420         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4421         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4422         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4423         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4424         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4425         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4426         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4427         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4428         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4429         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4430         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4431         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4432         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4433         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4434         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4435         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4436         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4437         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4438         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4439         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4440         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4441         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4442         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4443         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4444 };
4445
4446 /*
4447   show usage message
4448  */
4449 static void usage(void)
4450 {
4451         int i;
4452         printf(
4453 "Usage: ctdb [options] <control>\n" \
4454 "Options:\n" \
4455 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4456 "   -Y                 generate machinereadable output\n"
4457 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4458         printf("Controls:\n");
4459         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4460                 printf("  %-15s %-27s  %s\n", 
4461                        ctdb_commands[i].name, 
4462                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4463                        ctdb_commands[i].msg);
4464         }
4465         exit(1);
4466 }
4467
4468
4469 static void ctdb_alarm(int sig)
4470 {
4471         printf("Maximum runtime exceeded - exiting\n");
4472         _exit(ERR_TIMEOUT);
4473 }
4474
4475 /*
4476   main program
4477 */
4478 int main(int argc, const char *argv[])
4479 {
4480         struct ctdb_context *ctdb;
4481         char *nodestring = NULL;
4482         struct poptOption popt_options[] = {
4483                 POPT_AUTOHELP
4484                 POPT_CTDB_CMDLINE
4485                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4486                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4487                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4488                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4489                 POPT_TABLEEND
4490         };
4491         int opt;
4492         const char **extra_argv;
4493         int extra_argc = 0;
4494         int ret=-1, i;
4495         poptContext pc;
4496         struct event_context *ev;
4497         const char *control;
4498
4499         setlinebuf(stdout);
4500         
4501         /* set some defaults */
4502         options.maxruntime = 0;
4503         options.timelimit = 3;
4504         options.pnn = CTDB_CURRENT_NODE;
4505
4506         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4507
4508         while ((opt = poptGetNextOpt(pc)) != -1) {
4509                 switch (opt) {
4510                 default:
4511                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4512                                 poptBadOption(pc, 0), poptStrerror(opt)));
4513                         exit(1);
4514                 }
4515         }
4516
4517         /* setup the remaining options for the main program to use */
4518         extra_argv = poptGetArgs(pc);
4519         if (extra_argv) {
4520                 extra_argv++;
4521                 while (extra_argv[extra_argc]) extra_argc++;
4522         }
4523
4524         if (extra_argc < 1) {
4525                 usage();
4526         }
4527
4528         if (options.maxruntime == 0) {
4529                 const char *ctdb_timeout;
4530                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4531                 if (ctdb_timeout != NULL) {
4532                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4533                 } else {
4534                         /* default timeout is 120 seconds */
4535                         options.maxruntime = 120;
4536                 }
4537         }
4538
4539         signal(SIGALRM, ctdb_alarm);
4540         alarm(options.maxruntime);
4541
4542         /* setup the node number to contact */
4543         if (nodestring != NULL) {
4544                 if (strcmp(nodestring, "all") == 0) {
4545                         options.pnn = CTDB_BROADCAST_ALL;
4546                 } else {
4547                         options.pnn = strtoul(nodestring, NULL, 0);
4548                 }
4549         }
4550
4551         control = extra_argv[0];
4552
4553         ev = event_context_init(NULL);
4554
4555         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4556                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4557                         int j;
4558                         const char *socket_name;
4559
4560                         if (ctdb_commands[i].without_daemon == true) {
4561                                 close(2);
4562                         }
4563
4564                         /* initialise ctdb */
4565                         ctdb = ctdb_cmdline_client(ev);
4566
4567                         /* initialize a libctdb connection as well */
4568                         socket_name = ctdb_get_socketname(ctdb);
4569                         ctdb_connection = ctdb_connect(socket_name);
4570                         if (ctdb_connection == NULL) {
4571                                 fprintf(stderr, "Failed to connect to daemon from libctdb\n");
4572                                 exit(1);
4573                         }                               
4574                         
4575                         if (ctdb_commands[i].without_daemon == false) {
4576                                 if (ctdb == NULL) {
4577                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4578                                         exit(1);
4579                                 }
4580
4581                                 /* verify the node exists */
4582                                 verify_node(ctdb);
4583
4584                                 if (options.pnn == CTDB_CURRENT_NODE) {
4585                                         int pnn;
4586                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4587                                         if (pnn == -1) {
4588                                                 return -1;
4589                                         }
4590                                         options.pnn = pnn;
4591                                 }
4592                         }
4593
4594                         if (ctdb_commands[i].auto_all && 
4595                             options.pnn == CTDB_BROADCAST_ALL) {
4596                                 uint32_t *nodes;
4597                                 uint32_t num_nodes;
4598                                 ret = 0;
4599
4600                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4601                                 CTDB_NO_MEMORY(ctdb, nodes);
4602         
4603                                 for (j=0;j<num_nodes;j++) {
4604                                         options.pnn = nodes[j];
4605                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4606                                 }
4607                                 talloc_free(nodes);
4608                         } else {
4609                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4610                         }
4611                         break;
4612                 }
4613         }
4614
4615         if (i == ARRAY_SIZE(ctdb_commands)) {
4616                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4617                 exit(1);
4618         }
4619
4620         return ret;
4621 }