In control_ipreallocate() we wait at most 5 tries before aborting the command
[sahlberg/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         int machinereadable;
44         int maxruntime;
45 } options;
46
47 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
48 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
49
50 #ifdef CTDB_VERS
51 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
52 {
53 #define STR(x) #x
54 #define XSTR(x) STR(x)
55         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
56         return 0;
57 }
58 #endif
59
60
61 /*
62   verify that a node exists and is reachable
63  */
64 static void verify_node(struct ctdb_context *ctdb)
65 {
66         int ret;
67         struct ctdb_node_map *nodemap=NULL;
68
69         if (options.pnn == CTDB_CURRENT_NODE) {
70                 return;
71         }
72         if (options.pnn == CTDB_BROADCAST_ALL) {
73                 return;
74         }
75
76         /* verify the node exists */
77         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
78                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
79                 exit(10);
80         }
81         if (options.pnn >= nodemap->num) {
82                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
83                 exit(ERR_NONODE);
84         }
85         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
86                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
87                 exit(ERR_DISNODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93
94         /* verify we can access the node */
95         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
96         if (ret == -1) {
97                 DEBUG(DEBUG_ERR,("Can not ban node. Node is not operational.\n"));
98                 exit(10);
99         }
100 }
101
102 /*
103  check if a database exists
104 */
105 static int db_exists(struct ctdb_context *ctdb, const char *db_name)
106 {
107         int i, ret;
108         struct ctdb_dbid_map *dbmap=NULL;
109
110         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
111         if (ret != 0) {
112                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
113                 return -1;
114         }
115
116         for(i=0;i<dbmap->num;i++){
117                 const char *name;
118
119                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
120                 if (!strcmp(name, db_name)) {
121                         return 0;
122                 }
123         }
124
125         return -1;
126 }
127
128 /*
129   see if a process exists
130  */
131 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
132 {
133         uint32_t pnn, pid;
134         int ret;
135         if (argc < 1) {
136                 usage();
137         }
138
139         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
140                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
141                 return -1;
142         }
143
144         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
145         if (ret == 0) {
146                 printf("%u:%u exists\n", pnn, pid);
147         } else {
148                 printf("%u:%u does not exist\n", pnn, pid);
149         }
150         return ret;
151 }
152
153 /*
154   display statistics structure
155  */
156 static void show_statistics(struct ctdb_statistics *s)
157 {
158         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
159         int i;
160         const char *prefix=NULL;
161         int preflen=0;
162         const struct {
163                 const char *name;
164                 uint32_t offset;
165         } fields[] = {
166 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
167                 STATISTICS_FIELD(num_clients),
168                 STATISTICS_FIELD(frozen),
169                 STATISTICS_FIELD(recovering),
170                 STATISTICS_FIELD(num_recoveries),
171                 STATISTICS_FIELD(client_packets_sent),
172                 STATISTICS_FIELD(client_packets_recv),
173                 STATISTICS_FIELD(node_packets_sent),
174                 STATISTICS_FIELD(node_packets_recv),
175                 STATISTICS_FIELD(keepalive_packets_sent),
176                 STATISTICS_FIELD(keepalive_packets_recv),
177                 STATISTICS_FIELD(node.req_call),
178                 STATISTICS_FIELD(node.reply_call),
179                 STATISTICS_FIELD(node.req_dmaster),
180                 STATISTICS_FIELD(node.reply_dmaster),
181                 STATISTICS_FIELD(node.reply_error),
182                 STATISTICS_FIELD(node.req_message),
183                 STATISTICS_FIELD(node.req_control),
184                 STATISTICS_FIELD(node.reply_control),
185                 STATISTICS_FIELD(client.req_call),
186                 STATISTICS_FIELD(client.req_message),
187                 STATISTICS_FIELD(client.req_control),
188                 STATISTICS_FIELD(timeouts.call),
189                 STATISTICS_FIELD(timeouts.control),
190                 STATISTICS_FIELD(timeouts.traverse),
191                 STATISTICS_FIELD(total_calls),
192                 STATISTICS_FIELD(pending_calls),
193                 STATISTICS_FIELD(lockwait_calls),
194                 STATISTICS_FIELD(pending_lockwait_calls),
195                 STATISTICS_FIELD(childwrite_calls),
196                 STATISTICS_FIELD(pending_childwrite_calls),
197                 STATISTICS_FIELD(memory_used),
198                 STATISTICS_FIELD(max_hop_count),
199         };
200         printf("CTDB version %u\n", CTDB_VERSION);
201         for (i=0;i<ARRAY_SIZE(fields);i++) {
202                 if (strchr(fields[i].name, '.')) {
203                         preflen = strcspn(fields[i].name, ".")+1;
204                         if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
205                                 prefix = fields[i].name;
206                                 printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
207                         }
208                 } else {
209                         preflen = 0;
210                 }
211                 printf(" %*s%-22s%*s%10u\n", 
212                        preflen?4:0, "",
213                        fields[i].name+preflen, 
214                        preflen?0:4, "",
215                        *(uint32_t *)(fields[i].offset+(uint8_t *)s));
216         }
217         printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
218         printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
219
220         printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
221         printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
222         printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
223         talloc_free(tmp_ctx);
224 }
225
226 /*
227   display remote ctdb statistics combined from all nodes
228  */
229 static int control_statistics_all(struct ctdb_context *ctdb)
230 {
231         int ret, i;
232         struct ctdb_statistics statistics;
233         uint32_t *nodes;
234         uint32_t num_nodes;
235
236         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
237         CTDB_NO_MEMORY(ctdb, nodes);
238         
239         ZERO_STRUCT(statistics);
240
241         for (i=0;i<num_nodes;i++) {
242                 struct ctdb_statistics s1;
243                 int j;
244                 uint32_t *v1 = (uint32_t *)&s1;
245                 uint32_t *v2 = (uint32_t *)&statistics;
246                 uint32_t num_ints = 
247                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
248                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
249                 if (ret != 0) {
250                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
251                         return ret;
252                 }
253                 for (j=0;j<num_ints;j++) {
254                         v2[j] += v1[j];
255                 }
256                 statistics.max_hop_count = 
257                         MAX(statistics.max_hop_count, s1.max_hop_count);
258                 statistics.max_call_latency = 
259                         MAX(statistics.max_call_latency, s1.max_call_latency);
260                 statistics.max_lockwait_latency = 
261                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
262         }
263         talloc_free(nodes);
264         printf("Gathered statistics for %u nodes\n", num_nodes);
265         show_statistics(&statistics);
266         return 0;
267 }
268
269 /*
270   display remote ctdb statistics
271  */
272 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
273 {
274         int ret;
275         struct ctdb_statistics statistics;
276
277         if (options.pnn == CTDB_BROADCAST_ALL) {
278                 return control_statistics_all(ctdb);
279         }
280
281         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
282         if (ret != 0) {
283                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
284                 return ret;
285         }
286         show_statistics(&statistics);
287         return 0;
288 }
289
290
291 /*
292   reset remote ctdb statistics
293  */
294 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
295 {
296         int ret;
297
298         ret = ctdb_statistics_reset(ctdb, options.pnn);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
301                 return ret;
302         }
303         return 0;
304 }
305
306
307 /*
308   display uptime of remote node
309  */
310 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
311 {
312         int ret;
313         struct ctdb_uptime *uptime = NULL;
314         int tmp, days, hours, minutes, seconds;
315
316         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
317         if (ret != 0) {
318                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
319                 return ret;
320         }
321
322         if (options.machinereadable){
323                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
324                 printf(":%u:%u:%u:%lf\n",
325                         (unsigned int)uptime->current_time.tv_sec,
326                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
327                         (unsigned int)uptime->last_recovery_finished.tv_sec,
328                         timeval_delta(&uptime->last_recovery_finished,
329                                       &uptime->last_recovery_started)
330                 );
331                 return 0;
332         }
333
334         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
335
336         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
337         seconds = tmp%60;
338         tmp    /= 60;
339         minutes = tmp%60;
340         tmp    /= 60;
341         hours   = tmp%24;
342         tmp    /= 24;
343         days    = tmp;
344         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
345
346         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
347         seconds = tmp%60;
348         tmp    /= 60;
349         minutes = tmp%60;
350         tmp    /= 60;
351         hours   = tmp%24;
352         tmp    /= 24;
353         days    = tmp;
354         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
355         
356         printf("Duration of last recovery/failover: %lf seconds\n",
357                 timeval_delta(&uptime->last_recovery_finished,
358                               &uptime->last_recovery_started));
359
360         return 0;
361 }
362
363 /*
364   show the PNN of the current node
365  */
366 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
367 {
368         int mypnn;
369
370         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
371         if (mypnn == -1) {
372                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node."));
373                 return -1;
374         }
375
376         printf("PNN:%d\n", mypnn);
377         return 0;
378 }
379
380
381 struct pnn_node {
382         struct pnn_node *next;
383         const char *addr;
384         int pnn;
385 };
386
387 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
388 {
389         const char *nodes_list;
390         int nlines;
391         char **lines;
392         int i, pnn;
393         struct pnn_node *pnn_nodes = NULL;
394         struct pnn_node *pnn_node;
395         struct pnn_node *tmp_node;
396
397         /* read the nodes file */
398         nodes_list = getenv("CTDB_NODES");
399         if (nodes_list == NULL) {
400                 nodes_list = "/etc/ctdb/nodes";
401         }
402         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
403         if (lines == NULL) {
404                 return NULL;
405         }
406         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
407                 nlines--;
408         }
409         for (i=0, pnn=0; i<nlines; i++) {
410                 char *node;
411
412                 node = lines[i];
413                 /* strip leading spaces */
414                 while((*node == ' ') || (*node == '\t')) {
415                         node++;
416                 }
417                 if (*node == '#') {
418                         pnn++;
419                         continue;
420                 }
421                 if (strcmp(node, "") == 0) {
422                         continue;
423                 }
424                 pnn_node = talloc(mem_ctx, struct pnn_node);
425                 pnn_node->pnn = pnn++;
426                 pnn_node->addr = talloc_strdup(pnn_node, node);
427                 pnn_node->next = pnn_nodes;
428                 pnn_nodes = pnn_node;
429         }
430
431         /* swap them around so we return them in incrementing order */
432         pnn_node = pnn_nodes;
433         pnn_nodes = NULL;
434         while (pnn_node) {
435                 tmp_node = pnn_node;
436                 pnn_node = pnn_node->next;
437
438                 tmp_node->next = pnn_nodes;
439                 pnn_nodes = tmp_node;
440         }
441
442         return pnn_nodes;
443 }
444
445 /*
446   show the PNN of the current node
447   discover the pnn by loading the nodes file and try to bind to all
448   addresses one at a time until the ip address is found.
449  */
450 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
451 {
452         TALLOC_CTX *mem_ctx = talloc_new(NULL);
453         struct pnn_node *pnn_nodes;
454         struct pnn_node *pnn_node;
455
456         pnn_nodes = read_nodes_file(mem_ctx);
457         if (pnn_nodes == NULL) {
458                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
459                 talloc_free(mem_ctx);
460                 return -1;
461         }
462
463         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
464                 ctdb_sock_addr addr;
465
466                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
467                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
468                         talloc_free(mem_ctx);
469                         return -1;
470                 }
471
472                 if (ctdb_sys_have_ip(&addr)) {
473                         printf("PNN:%d\n", pnn_node->pnn);
474                         talloc_free(mem_ctx);
475                         return 0;
476                 }
477         }
478
479         printf("Failed to detect which PNN this node is\n");
480         talloc_free(mem_ctx);
481         return -1;
482 }
483
484 /*
485   display remote ctdb status
486  */
487 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
488 {
489         int i, ret;
490         struct ctdb_vnn_map *vnnmap=NULL;
491         struct ctdb_node_map *nodemap=NULL;
492         uint32_t recmode, recmaster;
493         int mypnn;
494
495         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
496         if (mypnn == -1) {
497                 return -1;
498         }
499
500         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
501         if (ret != 0) {
502                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
503                 return ret;
504         }
505
506         if(options.machinereadable){
507                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
508                 for(i=0;i<nodemap->num;i++){
509                         int partially_online = 0;
510                         int j;
511
512                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
513                                 continue;
514                         }
515                         if (nodemap->nodes[i].flags == 0) {
516                                 struct ctdb_control_get_ifaces *ifaces;
517
518                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
519                                                            nodemap->nodes[i].pnn,
520                                                            ctdb, &ifaces);
521                                 if (ret == 0) {
522                                         for (j=0; j < ifaces->num; j++) {
523                                                 if (ifaces->ifaces[j].link_state != 0) {
524                                                         continue;
525                                                 }
526                                                 partially_online = 1;
527                                                 break;
528                                         }
529                                         talloc_free(ifaces);
530                                 }
531                         }
532                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
533                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
534                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
535                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
536                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
537                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
538                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
539                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
540                                partially_online);
541                 }
542                 return 0;
543         }
544
545         printf("Number of nodes:%d\n", nodemap->num);
546         for(i=0;i<nodemap->num;i++){
547                 static const struct {
548                         uint32_t flag;
549                         const char *name;
550                 } flag_names[] = {
551                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
552                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
553                         { NODE_FLAGS_BANNED,                "BANNED" },
554                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
555                         { NODE_FLAGS_DELETED,               "DELETED" },
556                         { NODE_FLAGS_STOPPED,               "STOPPED" },
557                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
558                 };
559                 char *flags_str = NULL;
560                 int j;
561
562                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
563                         continue;
564                 }
565                 if (nodemap->nodes[i].flags == 0) {
566                         struct ctdb_control_get_ifaces *ifaces;
567
568                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
569                                                    nodemap->nodes[i].pnn,
570                                                    ctdb, &ifaces);
571                         if (ret == 0) {
572                                 for (j=0; j < ifaces->num; j++) {
573                                         if (ifaces->ifaces[j].link_state != 0) {
574                                                 continue;
575                                         }
576                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
577                                         break;
578                                 }
579                                 talloc_free(ifaces);
580                         }
581                 }
582                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
583                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
584                                 if (flags_str == NULL) {
585                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
586                                 } else {
587                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
588                                                                            flag_names[j].name);
589                                 }
590                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
591                         }
592                 }
593                 if (flags_str == NULL) {
594                         flags_str = talloc_strdup(ctdb, "OK");
595                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
596                 }
597                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
598                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
599                        flags_str,
600                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
601                 talloc_free(flags_str);
602         }
603
604         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
605         if (ret != 0) {
606                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
607                 return ret;
608         }
609         if (vnnmap->generation == INVALID_GENERATION) {
610                 printf("Generation:INVALID\n");
611         } else {
612                 printf("Generation:%d\n",vnnmap->generation);
613         }
614         printf("Size:%d\n",vnnmap->size);
615         for(i=0;i<vnnmap->size;i++){
616                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
617         }
618
619         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
620         if (ret != 0) {
621                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
622                 return ret;
623         }
624         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
625
626         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
627         if (ret != 0) {
628                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
629                 return ret;
630         }
631         printf("Recovery master:%d\n",recmaster);
632
633         return 0;
634 }
635
636
637 struct natgw_node {
638         struct natgw_node *next;
639         const char *addr;
640 };
641
642 /*
643   display the list of nodes belonging to this natgw configuration
644  */
645 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
646 {
647         int i, ret;
648         const char *natgw_list;
649         int nlines;
650         char **lines;
651         struct natgw_node *natgw_nodes = NULL;
652         struct natgw_node *natgw_node;
653         struct ctdb_node_map *nodemap=NULL;
654
655
656         /* read the natgw nodes file into a linked list */
657         natgw_list = getenv("NATGW_NODES");
658         if (natgw_list == NULL) {
659                 natgw_list = "/etc/ctdb/natgw_nodes";
660         }
661         lines = file_lines_load(natgw_list, &nlines, ctdb);
662         if (lines == NULL) {
663                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
664                 return -1;
665         }
666         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
667                 nlines--;
668         }
669         for (i=0;i<nlines;i++) {
670                 char *node;
671
672                 node = lines[i];
673                 /* strip leading spaces */
674                 while((*node == ' ') || (*node == '\t')) {
675                         node++;
676                 }
677                 if (*node == '#') {
678                         continue;
679                 }
680                 if (strcmp(node, "") == 0) {
681                         continue;
682                 }
683                 natgw_node = talloc(ctdb, struct natgw_node);
684                 natgw_node->addr = talloc_strdup(natgw_node, node);
685                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
686                 natgw_node->next = natgw_nodes;
687                 natgw_nodes = natgw_node;
688         }
689
690         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
691         if (ret != 0) {
692                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
693                 return ret;
694         }
695
696         i=0;
697         while(i<nodemap->num) {
698                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
699                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
700                                 break;
701                         }
702                 }
703
704                 /* this node was not in the natgw so we just remove it from
705                  * the list
706                  */
707                 if ((natgw_node == NULL) 
708                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
709                         int j;
710
711                         for (j=i+1; j<nodemap->num; j++) {
712                                 nodemap->nodes[j-1] = nodemap->nodes[j];
713                         }
714                         nodemap->num--;
715                         continue;
716                 }
717
718                 i++;
719         }               
720
721         /* pick a node to be natgwmaster
722          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
723          */
724         for(i=0;i<nodemap->num;i++){
725                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
726                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
727                         break;
728                 }
729         }
730         /* we couldnt find any healthy node, try unhealthy ones */
731         if (i == nodemap->num) {
732                 for(i=0;i<nodemap->num;i++){
733                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
734                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
735                                 break;
736                         }
737                 }
738         }
739         /* unless all nodes are STOPPED, when we pick one anyway */
740         if (i == nodemap->num) {
741                 for(i=0;i<nodemap->num;i++){
742                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
743                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
744                                 break;
745                         }
746                 }
747                 /* or if we still can not find any */
748                 if (i == nodemap->num) {
749                         printf("-1 0.0.0.0\n");
750                 }
751         }
752
753         /* print the pruned list of nodes belonging to this natgw list */
754         for(i=0;i<nodemap->num;i++){
755                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
756                         continue;
757                 }
758                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
759                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
760                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
761                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
762                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
763                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
764                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
765         }
766
767         return 0;
768 }
769
770 /*
771   display the status of the scripts for monitoring (or other events)
772  */
773 static int control_one_scriptstatus(struct ctdb_context *ctdb,
774                                     enum ctdb_eventscript_call type)
775 {
776         struct ctdb_scripts_wire *script_status;
777         int ret, i;
778
779         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
782                 return ret;
783         }
784
785         if (script_status == NULL) {
786                 if (!options.machinereadable) {
787                         printf("%s cycle never run\n",
788                                ctdb_eventscript_call_names[type]);
789                 }
790                 return 0;
791         }
792
793         if (!options.machinereadable) {
794                 printf("%d scripts were executed last %s cycle\n",
795                        script_status->num_scripts,
796                        ctdb_eventscript_call_names[type]);
797         }
798         for (i=0; i<script_status->num_scripts; i++) {
799                 const char *status = NULL;
800
801                 switch (script_status->scripts[i].status) {
802                 case -ETIME:
803                         status = "TIMEDOUT";
804                         break;
805                 case -ENOEXEC:
806                         status = "DISABLED";
807                         break;
808                 case 0:
809                         status = "OK";
810                         break;
811                 default:
812                         if (script_status->scripts[i].status > 0)
813                                 status = "ERROR";
814                         break;
815                 }
816                 if (options.machinereadable) {
817                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
818                                ctdb_eventscript_call_names[type],
819                                script_status->scripts[i].name,
820                                script_status->scripts[i].status,
821                                status,
822                                (long)script_status->scripts[i].start.tv_sec,
823                                (long)script_status->scripts[i].start.tv_usec,
824                                (long)script_status->scripts[i].finished.tv_sec,
825                                (long)script_status->scripts[i].finished.tv_usec,
826                                script_status->scripts[i].output);
827                         continue;
828                 }
829                 if (status)
830                         printf("%-20s Status:%s    ",
831                                script_status->scripts[i].name, status);
832                 else
833                         /* Some other error, eg from stat. */
834                         printf("%-20s Status:CANNOT RUN (%s)",
835                                script_status->scripts[i].name,
836                                strerror(-script_status->scripts[i].status));
837
838                 if (script_status->scripts[i].status >= 0) {
839                         printf("Duration:%.3lf ",
840                         timeval_delta(&script_status->scripts[i].finished,
841                               &script_status->scripts[i].start));
842                 }
843                 if (script_status->scripts[i].status != -ENOEXEC) {
844                         printf("%s",
845                                ctime(&script_status->scripts[i].start.tv_sec));
846                         if (script_status->scripts[i].status != 0) {
847                                 printf("   OUTPUT:%s\n",
848                                        script_status->scripts[i].output);
849                         }
850                 } else {
851                         printf("\n");
852                 }
853         }
854         return 0;
855 }
856
857
858 static int control_scriptstatus(struct ctdb_context *ctdb,
859                                 int argc, const char **argv)
860 {
861         int ret;
862         enum ctdb_eventscript_call type, min, max;
863         const char *arg;
864
865         if (argc > 1) {
866                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
867                 return -1;
868         }
869
870         if (argc == 0)
871                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
872         else
873                 arg = argv[0];
874
875         for (type = 0; type < CTDB_EVENT_MAX; type++) {
876                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
877                         min = type;
878                         max = type+1;
879                         break;
880                 }
881         }
882         if (type == CTDB_EVENT_MAX) {
883                 if (strcmp(arg, "all") == 0) {
884                         min = 0;
885                         max = CTDB_EVENT_MAX;
886                 } else {
887                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
888                         return -1;
889                 }
890         }
891
892         if (options.machinereadable) {
893                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
894         }
895
896         for (type = min; type < max; type++) {
897                 ret = control_one_scriptstatus(ctdb, type);
898                 if (ret != 0) {
899                         return ret;
900                 }
901         }
902
903         return 0;
904 }
905
906 /*
907   enable an eventscript
908  */
909 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
910 {
911         int ret;
912
913         if (argc < 1) {
914                 usage();
915         }
916
917         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
918         if (ret != 0) {
919           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
920                 return ret;
921         }
922
923         return 0;
924 }
925
926 /*
927   disable an eventscript
928  */
929 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
930 {
931         int ret;
932
933         if (argc < 1) {
934                 usage();
935         }
936
937         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
938         if (ret != 0) {
939           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
940                 return ret;
941         }
942
943         return 0;
944 }
945
946 /*
947   display the pnn of the recovery master
948  */
949 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
950 {
951         int ret;
952         uint32_t recmaster;
953
954         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
957                 return ret;
958         }
959         printf("%d\n",recmaster);
960
961         return 0;
962 }
963
964 /*
965   get a list of all tickles for this pnn
966  */
967 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
968 {
969         struct ctdb_control_tcp_tickle_list *list;
970         ctdb_sock_addr addr;
971         int i, ret;
972
973         if (argc < 1) {
974                 usage();
975         }
976
977         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
978                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
979                 return -1;
980         }
981
982         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
983         if (ret == -1) {
984                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
985                 return -1;
986         }
987
988         printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
989         printf("Num tickles:%u\n", list->tickles.num);
990         for (i=0;i<list->tickles.num;i++) {
991                 printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
992                 printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
993         }
994
995         talloc_free(list);
996         
997         return 0;
998 }
999
1000
1001 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1002 {
1003         struct ctdb_all_public_ips *ips;
1004         struct ctdb_public_ip ip;
1005         int i, ret;
1006         uint32_t *nodes;
1007         uint32_t disable_time;
1008         TDB_DATA data;
1009         struct ctdb_node_map *nodemap=NULL;
1010         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1011
1012         disable_time = 30;
1013         data.dptr  = (uint8_t*)&disable_time;
1014         data.dsize = sizeof(disable_time);
1015         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1016         if (ret != 0) {
1017                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1018                 return -1;
1019         }
1020
1021
1022
1023         /* read the public ip list from the node */
1024         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1025         if (ret != 0) {
1026                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1027                 talloc_free(tmp_ctx);
1028                 return -1;
1029         }
1030
1031         for (i=0;i<ips->num;i++) {
1032                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1033                         break;
1034                 }
1035         }
1036         if (i==ips->num) {
1037                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1038                         pnn, ctdb_addr_to_str(addr)));
1039                 talloc_free(tmp_ctx);
1040                 return -1;
1041         }
1042
1043         ip.pnn  = pnn;
1044         ip.addr = *addr;
1045
1046         data.dptr  = (uint8_t *)&ip;
1047         data.dsize = sizeof(ip);
1048
1049         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1050         if (ret != 0) {
1051                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1052                 talloc_free(tmp_ctx);
1053                 return ret;
1054         }
1055
1056         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1057         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1058                                         nodes, 0,
1059                                         LONGTIMELIMIT(),
1060                                         false, data,
1061                                         NULL, NULL,
1062                                         NULL);
1063         if (ret != 0) {
1064                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1065                 talloc_free(tmp_ctx);
1066                 return -1;
1067         }
1068
1069         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1070         if (ret != 0) {
1071                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1072                 talloc_free(tmp_ctx);
1073                 return -1;
1074         }
1075
1076         /* update the recovery daemon so it now knows to expect the new
1077            node assignment for this ip.
1078         */
1079         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1080         if (ret != 0) {
1081                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1082                 return -1;
1083         }
1084
1085         talloc_free(tmp_ctx);
1086         return 0;
1087 }
1088
1089 /*
1090   move/failover an ip address to a specific node
1091  */
1092 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1093 {
1094         uint32_t pnn;
1095         ctdb_sock_addr addr;
1096
1097         if (argc < 2) {
1098                 usage();
1099                 return -1;
1100         }
1101
1102         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1103                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1104                 return -1;
1105         }
1106
1107
1108         if (sscanf(argv[1], "%u", &pnn) != 1) {
1109                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1110                 return -1;
1111         }
1112
1113         if (move_ip(ctdb, &addr, pnn) != 0) {
1114                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1115                 return -1;
1116         }
1117
1118         return 0;
1119 }
1120
1121 void getips_store_callback(void *param, void *data)
1122 {
1123         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1124         struct ctdb_all_public_ips *ips = param;
1125         int i;
1126
1127         i = ips->num++;
1128         ips->ips[i].pnn  = node_ip->pnn;
1129         ips->ips[i].addr = node_ip->addr;
1130 }
1131
1132 void getips_count_callback(void *param, void *data)
1133 {
1134         uint32_t *count = param;
1135
1136         (*count)++;
1137 }
1138
1139 #define IP_KEYLEN       4
1140 static uint32_t *ip_key(ctdb_sock_addr *ip)
1141 {
1142         static uint32_t key[IP_KEYLEN];
1143
1144         bzero(key, sizeof(key));
1145
1146         switch (ip->sa.sa_family) {
1147         case AF_INET:
1148                 key[0]  = ip->ip.sin_addr.s_addr;
1149                 break;
1150         case AF_INET6:
1151                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1152                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1153                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1154                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1155                 break;
1156         default:
1157                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1158                 return key;
1159         }
1160
1161         return key;
1162 }
1163
1164 static void *add_ip_callback(void *parm, void *data)
1165 {
1166         return parm;
1167 }
1168
1169 static int
1170 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1171 {
1172         struct ctdb_all_public_ips *tmp_ips;
1173         struct ctdb_node_map *nodemap=NULL;
1174         trbt_tree_t *ip_tree;
1175         int i, j, len, ret;
1176         uint32_t count;
1177
1178         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1179         if (ret != 0) {
1180                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1181                 return ret;
1182         }
1183
1184         ip_tree = trbt_create(tmp_ctx, 0);
1185
1186         for(i=0;i<nodemap->num;i++){
1187                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1188                         continue;
1189                 }
1190                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1191                         continue;
1192                 }
1193
1194                 /* read the public ip list from this node */
1195                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1196                 if (ret != 0) {
1197                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1198                         return -1;
1199                 }
1200         
1201                 for (j=0; j<tmp_ips->num;j++) {
1202                         struct ctdb_public_ip *node_ip;
1203
1204                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1205                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1206                         node_ip->addr = tmp_ips->ips[j].addr;
1207
1208                         trbt_insertarray32_callback(ip_tree,
1209                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1210                                 add_ip_callback,
1211                                 node_ip);
1212                 }
1213                 talloc_free(tmp_ips);
1214         }
1215
1216         /* traverse */
1217         count = 0;
1218         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1219
1220         len = offsetof(struct ctdb_all_public_ips, ips) + 
1221                 count*sizeof(struct ctdb_public_ip);
1222         tmp_ips = talloc_zero_size(tmp_ctx, len);
1223         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1224
1225         *ips = tmp_ips;
1226
1227         return 0;
1228 }
1229
1230
1231 /* 
1232  * scans all other nodes and returns a pnn for another node that can host this 
1233  * ip address or -1
1234  */
1235 static int
1236 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1237 {
1238         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1239         struct ctdb_all_public_ips *ips;
1240         struct ctdb_node_map *nodemap=NULL;
1241         int i, j, ret;
1242
1243         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1244         if (ret != 0) {
1245                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1246                 talloc_free(tmp_ctx);
1247                 return ret;
1248         }
1249
1250         for(i=0;i<nodemap->num;i++){
1251                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1252                         continue;
1253                 }
1254                 if (nodemap->nodes[i].pnn == options.pnn) {
1255                         continue;
1256                 }
1257
1258                 /* read the public ip list from this node */
1259                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1260                 if (ret != 0) {
1261                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1262                         return -1;
1263                 }
1264
1265                 for (j=0;j<ips->num;j++) {
1266                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1267                                 talloc_free(tmp_ctx);
1268                                 return nodemap->nodes[i].pnn;
1269                         }
1270                 }
1271                 talloc_free(ips);
1272         }
1273
1274         talloc_free(tmp_ctx);
1275         return -1;
1276 }
1277
1278 /*
1279   add a public ip address to a node
1280  */
1281 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1282 {
1283         int i, ret;
1284         int len;
1285         uint32_t pnn;
1286         unsigned mask;
1287         ctdb_sock_addr addr;
1288         struct ctdb_control_ip_iface *pub;
1289         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1290         struct ctdb_all_public_ips *ips;
1291
1292
1293         if (argc != 2) {
1294                 talloc_free(tmp_ctx);
1295                 usage();
1296         }
1297
1298         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1299                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1300                 talloc_free(tmp_ctx);
1301                 return -1;
1302         }
1303
1304         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1305         if (ret != 0) {
1306                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1307                 talloc_free(tmp_ctx);
1308                 return ret;
1309         }
1310
1311
1312         /* check if some other node is already serving this ip, if not,
1313          * we will claim it
1314          */
1315         for (i=0;i<ips->num;i++) {
1316                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1317                         break;
1318                 }
1319         }
1320
1321         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1322         pub = talloc_size(tmp_ctx, len); 
1323         CTDB_NO_MEMORY(ctdb, pub);
1324
1325         pub->addr  = addr;
1326         pub->mask  = mask;
1327         pub->len   = strlen(argv[1])+1;
1328         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1329
1330         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1331         if (ret != 0) {
1332                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1333                 talloc_free(tmp_ctx);
1334                 return ret;
1335         }
1336
1337         if (i == ips->num) {
1338                 /* no one has this ip so we claim it */
1339                 pnn  = options.pnn;
1340         } else {
1341                 pnn  = ips->ips[i].pnn;
1342         }
1343
1344         if (move_ip(ctdb, &addr, pnn) != 0) {
1345                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1346                 return -1;
1347         }
1348
1349         talloc_free(tmp_ctx);
1350         return 0;
1351 }
1352
1353 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1354
1355 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1356 {
1357         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1358         struct ctdb_node_map *nodemap=NULL;
1359         struct ctdb_all_public_ips *ips;
1360         int ret, i, j;
1361
1362         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1363         if (ret != 0) {
1364                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1365                 return ret;
1366         }
1367
1368         /* remove it from the nodes that are not hosting the ip currently */
1369         for(i=0;i<nodemap->num;i++){
1370                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1371                         continue;
1372                 }
1373                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1374                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1375                         continue;
1376                 }
1377
1378                 for (j=0;j<ips->num;j++) {
1379                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1380                                 break;
1381                         }
1382                 }
1383                 if (j==ips->num) {
1384                         continue;
1385                 }
1386
1387                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1388                         continue;
1389                 }
1390
1391                 options.pnn = nodemap->nodes[i].pnn;
1392                 control_delip(ctdb, argc, argv);
1393         }
1394
1395
1396         /* remove it from every node (also the one hosting it) */
1397         for(i=0;i<nodemap->num;i++){
1398                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1399                         continue;
1400                 }
1401                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1402                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1403                         continue;
1404                 }
1405
1406                 for (j=0;j<ips->num;j++) {
1407                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1408                                 break;
1409                         }
1410                 }
1411                 if (j==ips->num) {
1412                         continue;
1413                 }
1414
1415                 options.pnn = nodemap->nodes[i].pnn;
1416                 control_delip(ctdb, argc, argv);
1417         }
1418
1419         talloc_free(tmp_ctx);
1420         return 0;
1421 }
1422         
1423 /*
1424   delete a public ip address from a node
1425  */
1426 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1427 {
1428         int i, ret;
1429         ctdb_sock_addr addr;
1430         struct ctdb_control_ip_iface pub;
1431         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1432         struct ctdb_all_public_ips *ips;
1433
1434         if (argc != 1) {
1435                 talloc_free(tmp_ctx);
1436                 usage();
1437         }
1438
1439         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1440                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1441                 return -1;
1442         }
1443
1444         if (options.pnn == CTDB_BROADCAST_ALL) {
1445                 return control_delip_all(ctdb, argc, argv, &addr);
1446         }
1447
1448         pub.addr  = addr;
1449         pub.mask  = 0;
1450         pub.len   = 0;
1451
1452         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1453         if (ret != 0) {
1454                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1455                 talloc_free(tmp_ctx);
1456                 return ret;
1457         }
1458         
1459         for (i=0;i<ips->num;i++) {
1460                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1461                         break;
1462                 }
1463         }
1464
1465         if (i==ips->num) {
1466                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1467                         ctdb_addr_to_str(&addr)));
1468                 talloc_free(tmp_ctx);
1469                 return -1;
1470         }
1471
1472         if (ips->ips[i].pnn == options.pnn) {
1473                 ret = find_other_host_for_public_ip(ctdb, &addr);
1474                 if (ret != -1) {
1475                         if (move_ip(ctdb, &addr, ret) != 0) {
1476                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1477                                 return -1;
1478                         }
1479                 }
1480         }
1481
1482         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1483         if (ret != 0) {
1484                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1485                 talloc_free(tmp_ctx);
1486                 return ret;
1487         }
1488
1489         talloc_free(tmp_ctx);
1490         return 0;
1491 }
1492
1493 /*
1494   kill a tcp connection
1495  */
1496 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1497 {
1498         int ret;
1499         struct ctdb_control_killtcp killtcp;
1500
1501         if (argc < 2) {
1502                 usage();
1503         }
1504
1505         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1506                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1507                 return -1;
1508         }
1509
1510         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1511                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1512                 return -1;
1513         }
1514
1515         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1516         if (ret != 0) {
1517                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1518                 return ret;
1519         }
1520
1521         return 0;
1522 }
1523
1524
1525 /*
1526   send a gratious arp
1527  */
1528 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1529 {
1530         int ret;
1531         ctdb_sock_addr addr;
1532
1533         if (argc < 2) {
1534                 usage();
1535         }
1536
1537         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1538                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1539                 return -1;
1540         }
1541
1542         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1543         if (ret != 0) {
1544                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1545                 return ret;
1546         }
1547
1548         return 0;
1549 }
1550
1551 /*
1552   register a server id
1553  */
1554 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1555 {
1556         int ret;
1557         struct ctdb_server_id server_id;
1558
1559         if (argc < 3) {
1560                 usage();
1561         }
1562
1563         server_id.pnn       = strtoul(argv[0], NULL, 0);
1564         server_id.type      = strtoul(argv[1], NULL, 0);
1565         server_id.server_id = strtoul(argv[2], NULL, 0);
1566
1567         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1570                 return ret;
1571         }
1572         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1573         sleep(999);
1574         return -1;
1575 }
1576
1577 /*
1578   unregister a server id
1579  */
1580 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1581 {
1582         int ret;
1583         struct ctdb_server_id server_id;
1584
1585         if (argc < 3) {
1586                 usage();
1587         }
1588
1589         server_id.pnn       = strtoul(argv[0], NULL, 0);
1590         server_id.type      = strtoul(argv[1], NULL, 0);
1591         server_id.server_id = strtoul(argv[2], NULL, 0);
1592
1593         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1594         if (ret != 0) {
1595                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1596                 return ret;
1597         }
1598         return -1;
1599 }
1600
1601 /*
1602   check if a server id exists
1603  */
1604 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1605 {
1606         uint32_t status;
1607         int ret;
1608         struct ctdb_server_id server_id;
1609
1610         if (argc < 3) {
1611                 usage();
1612         }
1613
1614         server_id.pnn       = strtoul(argv[0], NULL, 0);
1615         server_id.type      = strtoul(argv[1], NULL, 0);
1616         server_id.server_id = strtoul(argv[2], NULL, 0);
1617
1618         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1619         if (ret != 0) {
1620                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1621                 return ret;
1622         }
1623
1624         if (status) {
1625                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1626         } else {
1627                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1628         }
1629         return 0;
1630 }
1631
1632 /*
1633   get a list of all server ids that are registered on a node
1634  */
1635 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1636 {
1637         int i, ret;
1638         struct ctdb_server_id_list *server_ids;
1639
1640         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1641         if (ret != 0) {
1642                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1643                 return ret;
1644         }
1645
1646         for (i=0; i<server_ids->num; i++) {
1647                 printf("Server id %d:%d:%d\n", 
1648                         server_ids->server_ids[i].pnn, 
1649                         server_ids->server_ids[i].type, 
1650                         server_ids->server_ids[i].server_id); 
1651         }
1652
1653         return -1;
1654 }
1655
1656 /*
1657   send a tcp tickle ack
1658  */
1659 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1660 {
1661         int ret;
1662         ctdb_sock_addr  src, dst;
1663
1664         if (argc < 2) {
1665                 usage();
1666         }
1667
1668         if (!parse_ip_port(argv[0], &src)) {
1669                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1670                 return -1;
1671         }
1672
1673         if (!parse_ip_port(argv[1], &dst)) {
1674                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1675                 return -1;
1676         }
1677
1678         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1679         if (ret==0) {
1680                 return 0;
1681         }
1682         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1683
1684         return -1;
1685 }
1686
1687
1688 /*
1689   display public ip status
1690  */
1691 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1692 {
1693         int i, ret;
1694         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1695         struct ctdb_all_public_ips *ips;
1696
1697         if (options.pnn == CTDB_BROADCAST_ALL) {
1698                 /* read the list of public ips from all nodes */
1699                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1700         } else {
1701                 /* read the public ip list from this node */
1702                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1703         }
1704         if (ret != 0) {
1705                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1706                 talloc_free(tmp_ctx);
1707                 return ret;
1708         }
1709
1710         if (options.machinereadable){
1711                 printf(":Public IP:Node:ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:\n");
1712         } else {
1713                 if (options.pnn == CTDB_BROADCAST_ALL) {
1714                         printf("Public IPs on ALL nodes\n");
1715                 } else {
1716                         printf("Public IPs on node %u\n", options.pnn);
1717                 }
1718         }
1719
1720         for (i=1;i<=ips->num;i++) {
1721                 struct ctdb_control_public_ip_info *info = NULL;
1722                 int32_t pnn;
1723                 char *aciface = NULL;
1724                 char *avifaces = NULL;
1725                 char *cifaces = NULL;
1726
1727                 if (options.pnn == CTDB_BROADCAST_ALL) {
1728                         pnn = ips->ips[ips->num-i].pnn;
1729                 } else {
1730                         pnn = options.pnn;
1731                 }
1732
1733                 if (pnn != -1) {
1734                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1735                                                    &ips->ips[ips->num-i].addr, &info);
1736                 } else {
1737                         ret = -1;
1738                 }
1739
1740                 if (ret == 0) {
1741                         int j;
1742                         for (j=0; j < info->num; j++) {
1743                                 if (cifaces == NULL) {
1744                                         cifaces = talloc_strdup(info,
1745                                                                 info->ifaces[j].name);
1746                                 } else {
1747                                         cifaces = talloc_asprintf_append(cifaces,
1748                                                                          ",%s",
1749                                                                          info->ifaces[j].name);
1750                                 }
1751
1752                                 if (info->active_idx == j) {
1753                                         aciface = info->ifaces[j].name;
1754                                 }
1755
1756                                 if (info->ifaces[j].link_state == 0) {
1757                                         continue;
1758                                 }
1759
1760                                 if (avifaces == NULL) {
1761                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1762                                 } else {
1763                                         avifaces = talloc_asprintf_append(avifaces,
1764                                                                           ",%s",
1765                                                                           info->ifaces[j].name);
1766                                 }
1767                         }
1768                 }
1769
1770                 if (options.machinereadable){
1771                         printf(":%s:%d:%s:%s:%s:\n",
1772                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1773                                ips->ips[ips->num-i].pnn,
1774                                aciface?aciface:"",
1775                                avifaces?avifaces:"",
1776                                cifaces?cifaces:"");
1777                 } else {
1778                         printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1779                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1780                                ips->ips[ips->num-i].pnn,
1781                                aciface?aciface:"",
1782                                avifaces?avifaces:"",
1783                                cifaces?cifaces:"");
1784                 }
1785                 talloc_free(info);
1786         }
1787
1788         talloc_free(tmp_ctx);
1789         return 0;
1790 }
1791
1792 /*
1793   public ip info
1794  */
1795 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
1796 {
1797         int i, ret;
1798         ctdb_sock_addr addr;
1799         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1800         struct ctdb_control_public_ip_info *info;
1801
1802         if (argc != 1) {
1803                 talloc_free(tmp_ctx);
1804                 usage();
1805         }
1806
1807         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1808                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1809                 return -1;
1810         }
1811
1812         /* read the public ip info from this node */
1813         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
1814                                            tmp_ctx, &addr, &info);
1815         if (ret != 0) {
1816                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
1817                                   argv[0], options.pnn));
1818                 talloc_free(tmp_ctx);
1819                 return ret;
1820         }
1821
1822         printf("Public IP[%s] info on node %u\n",
1823                ctdb_addr_to_str(&info->ip.addr),
1824                options.pnn);
1825
1826         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
1827                ctdb_addr_to_str(&info->ip.addr),
1828                info->ip.pnn, info->num);
1829
1830         for (i=0; i<info->num; i++) {
1831                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1832
1833                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1834                        i+1, info->ifaces[i].name,
1835                        info->ifaces[i].link_state?"up":"down",
1836                        (unsigned int)info->ifaces[i].references,
1837                        (i==info->active_idx)?" (active)":"");
1838         }
1839
1840         talloc_free(tmp_ctx);
1841         return 0;
1842 }
1843
1844 /*
1845   display interfaces status
1846  */
1847 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
1848 {
1849         int i, ret;
1850         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1851         struct ctdb_control_get_ifaces *ifaces;
1852
1853         /* read the public ip list from this node */
1854         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
1855                                    tmp_ctx, &ifaces);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
1858                                   options.pnn));
1859                 talloc_free(tmp_ctx);
1860                 return ret;
1861         }
1862
1863         if (options.machinereadable){
1864                 printf(":Name:LinkStatus:References:\n");
1865         } else {
1866                 printf("Interfaces on node %u\n", options.pnn);
1867         }
1868
1869         for (i=0; i<ifaces->num; i++) {
1870                 if (options.machinereadable){
1871                         printf(":%s:%s:%u\n",
1872                                ifaces->ifaces[i].name,
1873                                ifaces->ifaces[i].link_state?"1":"0",
1874                                (unsigned int)ifaces->ifaces[i].references);
1875                 } else {
1876                         printf("name:%s link:%s references:%u\n",
1877                                ifaces->ifaces[i].name,
1878                                ifaces->ifaces[i].link_state?"up":"down",
1879                                (unsigned int)ifaces->ifaces[i].references);
1880                 }
1881         }
1882
1883         talloc_free(tmp_ctx);
1884         return 0;
1885 }
1886
1887
1888 /*
1889   set link status of an interface
1890  */
1891 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
1892 {
1893         int ret;
1894         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1895         struct ctdb_control_iface_info info;
1896
1897         ZERO_STRUCT(info);
1898
1899         if (argc != 2) {
1900                 usage();
1901         }
1902
1903         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1904                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
1905                                   argv[0]));
1906                 talloc_free(tmp_ctx);
1907                 return -1;
1908         }
1909         strcpy(info.name, argv[0]);
1910
1911         if (strcmp(argv[1], "up") == 0) {
1912                 info.link_state = 1;
1913         } else if (strcmp(argv[1], "down") == 0) {
1914                 info.link_state = 0;
1915         } else {
1916                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
1917                                   argv[1]));
1918                 talloc_free(tmp_ctx);
1919                 return -1;
1920         }
1921
1922         /* read the public ip list from this node */
1923         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
1924                                    tmp_ctx, &info);
1925         if (ret != 0) {
1926                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
1927                                   argv[0], options.pnn));
1928                 talloc_free(tmp_ctx);
1929                 return ret;
1930         }
1931
1932         talloc_free(tmp_ctx);
1933         return 0;
1934 }
1935
1936 /*
1937   display pid of a ctdb daemon
1938  */
1939 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
1940 {
1941         uint32_t pid;
1942         int ret;
1943
1944         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
1945         if (ret != 0) {
1946                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
1947                 return ret;
1948         }
1949         printf("Pid:%d\n", pid);
1950
1951         return 0;
1952 }
1953
1954 static uint32_t ipreallocate_finished;
1955
1956 /*
1957   handler for receiving the response to ipreallocate
1958 */
1959 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1960                              TDB_DATA data, void *private_data)
1961 {
1962         ipreallocate_finished = 1;
1963 }
1964
1965 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1966 {
1967         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1968
1969         event_add_timed(ctdb->ev, ctdb, 
1970                                 timeval_current_ofs(1, 0),
1971                                 ctdb_every_second, ctdb);
1972 }
1973
1974 /*
1975   ask the recovery daemon on the recovery master to perform a ip reallocation
1976  */
1977 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1978 {
1979         int i, ret;
1980         TDB_DATA data;
1981         struct takeover_run_reply rd;
1982         uint32_t recmaster;
1983         struct ctdb_node_map *nodemap=NULL;
1984         int retries=0;
1985         struct timeval tv = timeval_current();
1986
1987         /* we need some events to trigger so we can timeout and restart
1988            the loop
1989         */
1990         event_add_timed(ctdb->ev, ctdb, 
1991                                 timeval_current_ofs(1, 0),
1992                                 ctdb_every_second, ctdb);
1993
1994         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1995         if (rd.pnn == -1) {
1996                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1997                 return -1;
1998         }
1999         rd.srvid = getpid();
2000
2001         /* register a message port for receiveing the reply so that we
2002            can receive the reply
2003         */
2004         ctdb_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2005
2006         data.dptr = (uint8_t *)&rd;
2007         data.dsize = sizeof(rd);
2008
2009 again:
2010         /* check that there are valid nodes available */
2011         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2012                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2013                 return -1;
2014         }
2015         for (i=0; i<nodemap->num;i++) {
2016                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2017                         break;
2018                 }
2019         }
2020         if (i==nodemap->num) {
2021                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2022                 return 0;
2023         }
2024
2025
2026         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2027         if (ret != 0) {
2028                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2029                 return ret;
2030         }
2031
2032         /* verify the node exists */
2033         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2034                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2035                 return -1;
2036         }
2037
2038
2039         /* check tha there are nodes available that can act as a recmaster */
2040         for (i=0; i<nodemap->num; i++) {
2041                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2042                         continue;
2043                 }
2044                 break;
2045         }
2046         if (i == nodemap->num) {
2047                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2048                 return 0;
2049         }
2050
2051         /* verify the recovery master is not STOPPED, nor BANNED */
2052         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2053                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2054                 retries++;
2055                 sleep(1);
2056                 goto again;
2057         } 
2058
2059         
2060         /* verify the recovery master is not STOPPED, nor BANNED */
2061         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2062                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2063                 retries++;
2064                 sleep(1);
2065                 goto again;
2066         } 
2067
2068         ipreallocate_finished = 0;
2069         ret = ctdb_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2070         if (ret != 0) {
2071                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2072                 return -1;
2073         }
2074
2075         tv = timeval_current();
2076         /* this loop will terminate when we have received the reply */
2077         while (timeval_elapsed(&tv) < 3.0) {
2078                 event_loop_once(ctdb->ev);
2079         }
2080         if (ipreallocate_finished == 1) {
2081                 return 0;
2082         }
2083
2084         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2085         retries++;
2086         sleep(1);
2087         goto again;
2088
2089         return 0;
2090 }
2091
2092
2093 /*
2094   disable a remote node
2095  */
2096 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2097 {
2098         int ret;
2099         struct ctdb_node_map *nodemap=NULL;
2100
2101         /* check if the node is already disabled */
2102         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2103                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2104                 exit(10);
2105         }
2106         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2107                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2108                 return 0;
2109         }
2110
2111         do {
2112                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2113                 if (ret != 0) {
2114                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2115                         return ret;
2116                 }
2117
2118                 sleep(1);
2119
2120                 /* read the nodemap and verify the change took effect */
2121                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2122                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2123                         exit(10);
2124                 }
2125
2126         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2127         ret = control_ipreallocate(ctdb, argc, argv);
2128         if (ret != 0) {
2129                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2130                 return ret;
2131         }
2132
2133         return 0;
2134 }
2135
2136 /*
2137   enable a disabled remote node
2138  */
2139 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2140 {
2141         int ret;
2142
2143         struct ctdb_node_map *nodemap=NULL;
2144
2145
2146         /* check if the node is already enabled */
2147         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2148                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2149                 exit(10);
2150         }
2151         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2152                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2153                 return 0;
2154         }
2155
2156         do {
2157                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2158                 if (ret != 0) {
2159                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2160                         return ret;
2161                 }
2162
2163                 sleep(1);
2164
2165                 /* read the nodemap and verify the change took effect */
2166                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2167                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2168                         exit(10);
2169                 }
2170
2171         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2172
2173         ret = control_ipreallocate(ctdb, argc, argv);
2174         if (ret != 0) {
2175                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2176                 return ret;
2177         }
2178
2179         return 0;
2180 }
2181
2182 /*
2183   stop a remote node
2184  */
2185 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2186 {
2187         int ret;
2188         struct ctdb_node_map *nodemap=NULL;
2189
2190         do {
2191                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2192                 if (ret != 0) {
2193                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2194                 }
2195         
2196                 sleep(1);
2197
2198                 /* read the nodemap and verify the change took effect */
2199                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2200                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2201                         exit(10);
2202                 }
2203
2204         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2205         ret = control_ipreallocate(ctdb, argc, argv);
2206         if (ret != 0) {
2207                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2208                 return ret;
2209         }
2210
2211         return 0;
2212 }
2213
2214 /*
2215   restart a stopped remote node
2216  */
2217 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2218 {
2219         int ret;
2220
2221         struct ctdb_node_map *nodemap=NULL;
2222
2223         do {
2224                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2225                 if (ret != 0) {
2226                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2227                         return ret;
2228                 }
2229         
2230                 sleep(1);
2231
2232                 /* read the nodemap and verify the change took effect */
2233                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2234                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2235                         exit(10);
2236                 }
2237
2238         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2239         ret = control_ipreallocate(ctdb, argc, argv);
2240         if (ret != 0) {
2241                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2242                 return ret;
2243         }
2244
2245         return 0;
2246 }
2247
2248 static uint32_t get_generation(struct ctdb_context *ctdb)
2249 {
2250         struct ctdb_vnn_map *vnnmap=NULL;
2251         int ret;
2252
2253         /* wait until the recmaster is not in recovery mode */
2254         while (1) {
2255                 uint32_t recmode, recmaster;
2256                 
2257                 if (vnnmap != NULL) {
2258                         talloc_free(vnnmap);
2259                         vnnmap = NULL;
2260                 }
2261
2262                 /* get the recmaster */
2263                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2264                 if (ret != 0) {
2265                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2266                         exit(10);
2267                 }
2268
2269                 /* get recovery mode */
2270                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2271                 if (ret != 0) {
2272                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2273                         exit(10);
2274                 }
2275
2276                 /* get the current generation number */
2277                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2278                 if (ret != 0) {
2279                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2280                         exit(10);
2281                 }
2282
2283                 if ((recmode == CTDB_RECOVERY_NORMAL)
2284                 &&  (vnnmap->generation != 1)){
2285                         return vnnmap->generation;
2286                 }
2287                 sleep(1);
2288         }
2289 }
2290
2291 /*
2292   ban a node from the cluster
2293  */
2294 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2295 {
2296         int ret;
2297         struct ctdb_node_map *nodemap=NULL;
2298         struct ctdb_ban_time bantime;
2299
2300         if (argc < 1) {
2301                 usage();
2302         }
2303         
2304         /* verify the node exists */
2305         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2306         if (ret != 0) {
2307                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2308                 return ret;
2309         }
2310
2311         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2312                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2313                 return -1;
2314         }
2315
2316         bantime.pnn  = options.pnn;
2317         bantime.time = strtoul(argv[0], NULL, 0);
2318
2319         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2320         if (ret != 0) {
2321                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2322                 return -1;
2323         }       
2324
2325         ret = control_ipreallocate(ctdb, argc, argv);
2326         if (ret != 0) {
2327                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2328                 return ret;
2329         }
2330
2331         return 0;
2332 }
2333
2334
2335 /*
2336   unban a node from the cluster
2337  */
2338 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2339 {
2340         int ret;
2341         struct ctdb_node_map *nodemap=NULL;
2342         struct ctdb_ban_time bantime;
2343
2344         /* verify the node exists */
2345         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2346         if (ret != 0) {
2347                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2348                 return ret;
2349         }
2350
2351         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2352                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2353                 return -1;
2354         }
2355
2356         bantime.pnn  = options.pnn;
2357         bantime.time = 0;
2358
2359         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2360         if (ret != 0) {
2361                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2362                 return -1;
2363         }       
2364
2365         ret = control_ipreallocate(ctdb, argc, argv);
2366         if (ret != 0) {
2367                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2368                 return ret;
2369         }
2370
2371         return 0;
2372 }
2373
2374
2375 /*
2376   show ban information for a node
2377  */
2378 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2379 {
2380         int ret;
2381         struct ctdb_node_map *nodemap=NULL;
2382         struct ctdb_ban_time *bantime;
2383
2384         /* verify the node exists */
2385         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2386         if (ret != 0) {
2387                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2388                 return ret;
2389         }
2390
2391         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2392         if (ret != 0) {
2393                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2394                 return -1;
2395         }       
2396
2397         if (bantime->time == 0) {
2398                 printf("Node %u is not banned\n", bantime->pnn);
2399         } else {
2400                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2401         }
2402
2403         return 0;
2404 }
2405
2406 /*
2407   shutdown a daemon
2408  */
2409 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2410 {
2411         int ret;
2412
2413         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2414         if (ret != 0) {
2415                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2416                 return ret;
2417         }
2418
2419         return 0;
2420 }
2421
2422 /*
2423   trigger a recovery
2424  */
2425 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2426 {
2427         int ret;
2428         uint32_t generation, next_generation;
2429
2430         /* record the current generation number */
2431         generation = get_generation(ctdb);
2432
2433         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2434         if (ret != 0) {
2435                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2436                 return ret;
2437         }
2438
2439         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2440         if (ret != 0) {
2441                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2442                 return ret;
2443         }
2444
2445         /* wait until we are in a new generation */
2446         while (1) {
2447                 next_generation = get_generation(ctdb);
2448                 if (next_generation != generation) {
2449                         return 0;
2450                 }
2451                 sleep(1);
2452         }
2453
2454         return 0;
2455 }
2456
2457
2458 /*
2459   display monitoring mode of a remote node
2460  */
2461 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2462 {
2463         uint32_t monmode;
2464         int ret;
2465
2466         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2467         if (ret != 0) {
2468                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2469                 return ret;
2470         }
2471         if (!options.machinereadable){
2472                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2473         } else {
2474                 printf(":mode:\n");
2475                 printf(":%d:\n",monmode);
2476         }
2477         return 0;
2478 }
2479
2480
2481 /*
2482   display capabilities of a remote node
2483  */
2484 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2485 {
2486         uint32_t capabilities;
2487         int ret;
2488
2489         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2490         if (ret != 0) {
2491                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2492                 return ret;
2493         }
2494         
2495         if (!options.machinereadable){
2496                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2497                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2498                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2499                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2500         } else {
2501                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2502                 printf(":%d:%d:%d:%d:\n",
2503                         !!(capabilities&CTDB_CAP_RECMASTER),
2504                         !!(capabilities&CTDB_CAP_LMASTER),
2505                         !!(capabilities&CTDB_CAP_LVS),
2506                         !!(capabilities&CTDB_CAP_NATGW));
2507         }
2508         return 0;
2509 }
2510
2511 /*
2512   display lvs configuration
2513  */
2514 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2515 {
2516         uint32_t *capabilities;
2517         struct ctdb_node_map *nodemap=NULL;
2518         int i, ret;
2519         int healthy_count = 0;
2520
2521         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2524                 return ret;
2525         }
2526
2527         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2528         CTDB_NO_MEMORY(ctdb, capabilities);
2529         
2530         /* collect capabilities for all connected nodes */
2531         for (i=0; i<nodemap->num; i++) {
2532                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2533                         continue;
2534                 }
2535                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2536                         continue;
2537                 }
2538         
2539                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2540                 if (ret != 0) {
2541                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2542                         return ret;
2543                 }
2544
2545                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2546                         continue;
2547                 }
2548
2549                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2550                         healthy_count++;
2551                 }
2552         }
2553
2554         /* Print all LVS nodes */
2555         for (i=0; i<nodemap->num; i++) {
2556                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2557                         continue;
2558                 }
2559                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2560                         continue;
2561                 }
2562                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2563                         continue;
2564                 }
2565
2566                 if (healthy_count != 0) {
2567                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2568                                 continue;
2569                         }
2570                 }
2571
2572                 printf("%d:%s\n", i, 
2573                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2574         }
2575
2576         return 0;
2577 }
2578
2579 /*
2580   display who is the lvs master
2581  */
2582 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2583 {
2584         uint32_t *capabilities;
2585         struct ctdb_node_map *nodemap=NULL;
2586         int i, ret;
2587         int healthy_count = 0;
2588
2589         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2590         if (ret != 0) {
2591                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2592                 return ret;
2593         }
2594
2595         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2596         CTDB_NO_MEMORY(ctdb, capabilities);
2597         
2598         /* collect capabilities for all connected nodes */
2599         for (i=0; i<nodemap->num; i++) {
2600                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2601                         continue;
2602                 }
2603                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2604                         continue;
2605                 }
2606         
2607                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2608                 if (ret != 0) {
2609                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2610                         return ret;
2611                 }
2612
2613                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2614                         continue;
2615                 }
2616
2617                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2618                         healthy_count++;
2619                 }
2620         }
2621
2622         /* find and show the lvsmaster */
2623         for (i=0; i<nodemap->num; i++) {
2624                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2625                         continue;
2626                 }
2627                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2628                         continue;
2629                 }
2630                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2631                         continue;
2632                 }
2633
2634                 if (healthy_count != 0) {
2635                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2636                                 continue;
2637                         }
2638                 }
2639
2640                 if (options.machinereadable){
2641                         printf("%d\n", i);
2642                 } else {
2643                         printf("Node %d is LVS master\n", i);
2644                 }
2645                 return 0;
2646         }
2647
2648         printf("There is no LVS master\n");
2649         return -1;
2650 }
2651
2652 /*
2653   disable monitoring on a  node
2654  */
2655 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2656 {
2657         
2658         int ret;
2659
2660         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2661         if (ret != 0) {
2662                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2663                 return ret;
2664         }
2665         printf("Monitoring mode:%s\n","DISABLED");
2666
2667         return 0;
2668 }
2669
2670 /*
2671   enable monitoring on a  node
2672  */
2673 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2674 {
2675         
2676         int ret;
2677
2678         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2679         if (ret != 0) {
2680                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2681                 return ret;
2682         }
2683         printf("Monitoring mode:%s\n","ACTIVE");
2684
2685         return 0;
2686 }
2687
2688 /*
2689   display remote list of keys/data for a db
2690  */
2691 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2692 {
2693         const char *db_name;
2694         struct ctdb_db_context *ctdb_db;
2695         int ret;
2696
2697         if (argc < 1) {
2698                 usage();
2699         }
2700
2701         db_name = argv[0];
2702
2703
2704         if (db_exists(ctdb, db_name)) {
2705                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2706                 return -1;
2707         }
2708
2709         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
2710
2711         if (ctdb_db == NULL) {
2712                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2713                 return -1;
2714         }
2715
2716         /* traverse and dump the cluster tdb */
2717         ret = ctdb_dump_db(ctdb_db, stdout);
2718         if (ret == -1) {
2719                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2720                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2721                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2722                                   db_name));
2723                 return -1;
2724         }
2725         talloc_free(ctdb_db);
2726
2727         printf("Dumped %d records\n", ret);
2728         return 0;
2729 }
2730
2731
2732 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2733                              TDB_DATA data, void *private_data)
2734 {
2735         DEBUG(DEBUG_ERR,("Log data received\n"));
2736         if (data.dsize > 0) {
2737                 printf("%s", data.dptr);
2738         }
2739
2740         exit(0);
2741 }
2742
2743 /*
2744   display a list of log messages from the in memory ringbuffer
2745  */
2746 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
2747 {
2748         int ret;
2749         int32_t res;
2750         struct ctdb_get_log_addr log_addr;
2751         TDB_DATA data;
2752         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2753         char *errmsg;
2754         struct timeval tv;
2755
2756         if (argc != 1) {
2757                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
2758                 talloc_free(tmp_ctx);
2759                 return -1;
2760         }
2761
2762         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2763         log_addr.srvid = getpid();
2764         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
2765                 log_addr.level = get_debug_by_desc(argv[0]);
2766         } else {
2767                 log_addr.level = strtol(argv[0], NULL, 0);
2768         }
2769
2770
2771         data.dptr = (unsigned char *)&log_addr;
2772         data.dsize = sizeof(log_addr);
2773
2774         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
2775
2776         ctdb_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
2777         sleep(1);
2778
2779         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
2780
2781         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
2782                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
2783         if (ret != 0 || res != 0) {
2784                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
2785                 talloc_free(tmp_ctx);
2786                 return -1;
2787         }
2788
2789
2790         tv = timeval_current();
2791         /* this loop will terminate when we have received the reply */
2792         while (timeval_elapsed(&tv) < 3.0) {    
2793                 event_loop_once(ctdb->ev);
2794         }
2795
2796         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
2797
2798         talloc_free(tmp_ctx);
2799         return 0;
2800 }
2801
2802 /*
2803   clear the in memory log area
2804  */
2805 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
2806 {
2807         int ret;
2808         int32_t res;
2809         char *errmsg;
2810         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2811
2812         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
2813                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
2814         if (ret != 0 || res != 0) {
2815                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
2816                 talloc_free(tmp_ctx);
2817                 return -1;
2818         }
2819
2820         talloc_free(tmp_ctx);
2821         return 0;
2822 }
2823
2824
2825
2826 /*
2827   display a list of the databases on a remote ctdb
2828  */
2829 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
2830 {
2831         int i, ret;
2832         struct ctdb_dbid_map *dbmap=NULL;
2833
2834         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2835         if (ret != 0) {
2836                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2837                 return ret;
2838         }
2839
2840         if(options.machinereadable){
2841                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
2842                 for(i=0;i<dbmap->num;i++){
2843                         const char *path;
2844                         const char *name;
2845                         const char *health;
2846                         bool persistent;
2847
2848                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
2849                                             dbmap->dbs[i].dbid, ctdb, &path);
2850                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
2851                                             dbmap->dbs[i].dbid, ctdb, &name);
2852                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
2853                                               dbmap->dbs[i].dbid, ctdb, &health);
2854                         persistent = dbmap->dbs[i].persistent;
2855                         printf(":0x%08X:%s:%s:%d:%d:\n",
2856                                dbmap->dbs[i].dbid, name, path,
2857                                !!(persistent), !!(health));
2858                 }
2859                 return 0;
2860         }
2861
2862         printf("Number of databases:%d\n", dbmap->num);
2863         for(i=0;i<dbmap->num;i++){
2864                 const char *path;
2865                 const char *name;
2866                 const char *health;
2867                 bool persistent;
2868
2869                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2870                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2871                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2872                 persistent = dbmap->dbs[i].persistent;
2873                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
2874                        dbmap->dbs[i].dbid, name, path,
2875                        persistent?" PERSISTENT":"",
2876                        health?" UNHEALTHY":"");
2877         }
2878
2879         return 0;
2880 }
2881
2882 /*
2883   display the status of a database on a remote ctdb
2884  */
2885 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
2886 {
2887         int i, ret;
2888         struct ctdb_dbid_map *dbmap=NULL;
2889         const char *db_name;
2890
2891         if (argc < 1) {
2892                 usage();
2893         }
2894
2895         db_name = argv[0];
2896
2897         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
2898         if (ret != 0) {
2899                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
2900                 return ret;
2901         }
2902
2903         for(i=0;i<dbmap->num;i++){
2904                 const char *path;
2905                 const char *name;
2906                 const char *health;
2907                 bool persistent;
2908
2909                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
2910                 if (strcmp(name, db_name) != 0) {
2911                         continue;
2912                 }
2913
2914                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
2915                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
2916                 persistent = dbmap->dbs[i].persistent;
2917                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
2918                        dbmap->dbs[i].dbid, name, path,
2919                        persistent?"yes":"no",
2920                        health?health:"OK");
2921                 return 0;
2922         }
2923
2924         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
2925         return 0;
2926 }
2927
2928 /*
2929   check if the local node is recmaster or not
2930   it will return 1 if this node is the recmaster and 0 if it is not
2931   or if the local ctdb daemon could not be contacted
2932  */
2933 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2934 {
2935         uint32_t mypnn, recmaster;
2936         int ret;
2937
2938         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
2939         if (mypnn == -1) {
2940                 printf("Failed to get pnn of node\n");
2941                 return 1;
2942         }
2943
2944         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2945         if (ret != 0) {
2946                 printf("Failed to get the recmaster\n");
2947                 return 1;
2948         }
2949
2950         if (recmaster != mypnn) {
2951                 printf("this node is not the recmaster\n");
2952                 return 1;
2953         }
2954
2955         printf("this node is the recmaster\n");
2956         return 0;
2957 }
2958
2959 /*
2960   ping a node
2961  */
2962 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
2963 {
2964         int ret;
2965         struct timeval tv = timeval_current();
2966         ret = ctdb_ctrl_ping(ctdb, options.pnn);
2967         if (ret == -1) {
2968                 printf("Unable to get ping response from node %u\n", options.pnn);
2969                 return -1;
2970         } else {
2971                 printf("response from %u time=%.6f sec  (%d clients)\n", 
2972                        options.pnn, timeval_elapsed(&tv), ret);
2973         }
2974         return 0;
2975 }
2976
2977
2978 /*
2979   get a tunable
2980  */
2981 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
2982 {
2983         const char *name;
2984         uint32_t value;
2985         int ret;
2986
2987         if (argc < 1) {
2988                 usage();
2989         }
2990
2991         name = argv[0];
2992         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
2993         if (ret == -1) {
2994                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
2995                 return -1;
2996         }
2997
2998         printf("%-19s = %u\n", name, value);
2999         return 0;
3000 }
3001
3002 /*
3003   set a tunable
3004  */
3005 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3006 {
3007         const char *name;
3008         uint32_t value;
3009         int ret;
3010
3011         if (argc < 2) {
3012                 usage();
3013         }
3014
3015         name = argv[0];
3016         value = strtoul(argv[1], NULL, 0);
3017
3018         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3019         if (ret == -1) {
3020                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3021                 return -1;
3022         }
3023         return 0;
3024 }
3025
3026 /*
3027   list all tunables
3028  */
3029 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3030 {
3031         uint32_t count;
3032         const char **list;
3033         int ret, i;
3034
3035         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3036         if (ret == -1) {
3037                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3038                 return -1;
3039         }
3040
3041         for (i=0;i<count;i++) {
3042                 control_getvar(ctdb, 1, &list[i]);
3043         }
3044
3045         talloc_free(list);
3046         
3047         return 0;
3048 }
3049
3050 /*
3051   display debug level on a node
3052  */
3053 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3054 {
3055         int ret;
3056         int32_t level;
3057
3058         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3059         if (ret != 0) {
3060                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3061                 return ret;
3062         } else {
3063                 if (options.machinereadable){
3064                         printf(":Name:Level:\n");
3065                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3066                 } else {
3067                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3068                 }
3069         }
3070         return 0;
3071 }
3072
3073 /*
3074   display reclock file of a node
3075  */
3076 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3077 {
3078         int ret;
3079         const char *reclock;
3080
3081         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3082         if (ret != 0) {
3083                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3084                 return ret;
3085         } else {
3086                 if (options.machinereadable){
3087                         if (reclock != NULL) {
3088                                 printf("%s", reclock);
3089                         }
3090                 } else {
3091                         if (reclock == NULL) {
3092                                 printf("No reclock file used.\n");
3093                         } else {
3094                                 printf("Reclock file:%s\n", reclock);
3095                         }
3096                 }
3097         }
3098         return 0;
3099 }
3100
3101 /*
3102   set the reclock file of a node
3103  */
3104 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3105 {
3106         int ret;
3107         const char *reclock;
3108
3109         if (argc == 0) {
3110                 reclock = NULL;
3111         } else if (argc == 1) {
3112                 reclock = argv[0];
3113         } else {
3114                 usage();
3115         }
3116
3117         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3118         if (ret != 0) {
3119                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3120                 return ret;
3121         }
3122         return 0;
3123 }
3124
3125 /*
3126   set the natgw state on/off
3127  */
3128 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3129 {
3130         int ret;
3131         uint32_t natgwstate;
3132
3133         if (argc == 0) {
3134                 usage();
3135         }
3136
3137         if (!strcmp(argv[0], "on")) {
3138                 natgwstate = 1;
3139         } else if (!strcmp(argv[0], "off")) {
3140                 natgwstate = 0;
3141         } else {
3142                 usage();
3143         }
3144
3145         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3146         if (ret != 0) {
3147                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3148                 return ret;
3149         }
3150
3151         return 0;
3152 }
3153
3154 /*
3155   set the lmaster role on/off
3156  */
3157 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3158 {
3159         int ret;
3160         uint32_t lmasterrole;
3161
3162         if (argc == 0) {
3163                 usage();
3164         }
3165
3166         if (!strcmp(argv[0], "on")) {
3167                 lmasterrole = 1;
3168         } else if (!strcmp(argv[0], "off")) {
3169                 lmasterrole = 0;
3170         } else {
3171                 usage();
3172         }
3173
3174         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3175         if (ret != 0) {
3176                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3177                 return ret;
3178         }
3179
3180         return 0;
3181 }
3182
3183 /*
3184   set the recmaster role on/off
3185  */
3186 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3187 {
3188         int ret;
3189         uint32_t recmasterrole;
3190
3191         if (argc == 0) {
3192                 usage();
3193         }
3194
3195         if (!strcmp(argv[0], "on")) {
3196                 recmasterrole = 1;
3197         } else if (!strcmp(argv[0], "off")) {
3198                 recmasterrole = 0;
3199         } else {
3200                 usage();
3201         }
3202
3203         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3204         if (ret != 0) {
3205                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3206                 return ret;
3207         }
3208
3209         return 0;
3210 }
3211
3212 /*
3213   set debug level on a node or all nodes
3214  */
3215 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3216 {
3217         int i, ret;
3218         int32_t level;
3219
3220         if (argc == 0) {
3221                 printf("You must specify the debug level. Valid levels are:\n");
3222                 for (i=0; debug_levels[i].description != NULL; i++) {
3223                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3224                 }
3225
3226                 return 0;
3227         }
3228
3229         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3230                 level = get_debug_by_desc(argv[0]);
3231         } else {
3232                 level = strtol(argv[0], NULL, 0);
3233         }
3234
3235         for (i=0; debug_levels[i].description != NULL; i++) {
3236                 if (level == debug_levels[i].level) {
3237                         break;
3238                 }
3239         }
3240         if (debug_levels[i].description == NULL) {
3241                 printf("Invalid debug level, must be one of\n");
3242                 for (i=0; debug_levels[i].description != NULL; i++) {
3243                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3244                 }
3245                 return -1;
3246         }
3247
3248         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3249         if (ret != 0) {
3250                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3251         }
3252         return 0;
3253 }
3254
3255
3256 /*
3257   freeze a node
3258  */
3259 static int control_freeze(struct ctdb_context *ctdb, int argc, const char **argv)
3260 {
3261         int ret;
3262         uint32_t priority;
3263         
3264         if (argc == 1) {
3265                 priority = strtol(argv[0], NULL, 0);
3266         } else {
3267                 priority = 0;
3268         }
3269         DEBUG(DEBUG_ERR,("Freeze by priority %u\n", priority));
3270
3271         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR, ("Unable to freeze node %u\n", options.pnn));
3274         }               
3275         return 0;
3276 }
3277
3278 /*
3279   thaw a node
3280  */
3281 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3282 {
3283         int ret;
3284         uint32_t priority;
3285         
3286         if (argc == 1) {
3287                 priority = strtol(argv[0], NULL, 0);
3288         } else {
3289                 priority = 0;
3290         }
3291         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3292
3293         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3294         if (ret != 0) {
3295                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3296         }               
3297         return 0;
3298 }
3299
3300
3301 /*
3302   attach to a database
3303  */
3304 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3305 {
3306         const char *db_name;
3307         struct ctdb_db_context *ctdb_db;
3308
3309         if (argc < 1) {
3310                 usage();
3311         }
3312         db_name = argv[0];
3313
3314         ctdb_db = ctdb_attach(ctdb, db_name, false, 0);
3315         if (ctdb_db == NULL) {
3316                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3317                 return -1;
3318         }
3319
3320         return 0;
3321 }
3322
3323 /*
3324   set db priority
3325  */
3326 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3327 {
3328         struct ctdb_db_priority db_prio;
3329         int ret;
3330
3331         if (argc < 2) {
3332                 usage();
3333         }
3334
3335         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3336         db_prio.priority = strtoul(argv[1], NULL, 0);
3337
3338         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3339         if (ret != 0) {
3340                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3341                 return -1;
3342         }
3343
3344         return 0;
3345 }
3346
3347 /*
3348   get db priority
3349  */
3350 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3351 {
3352         uint32_t db_id, priority;
3353         int ret;
3354
3355         if (argc < 1) {
3356                 usage();
3357         }
3358
3359         db_id = strtoul(argv[0], NULL, 0);
3360
3361         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3362         if (ret != 0) {
3363                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3364                 return -1;
3365         }
3366
3367         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3368
3369         return 0;
3370 }
3371
3372 /*
3373   run an eventscript on a node
3374  */
3375 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3376 {
3377         TDB_DATA data;
3378         int ret;
3379         int32_t res;
3380         char *errmsg;
3381         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3382
3383         if (argc != 1) {
3384                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3385                 return -1;
3386         }
3387
3388         data.dptr = (unsigned char *)discard_const(argv[0]);
3389         data.dsize = strlen((char *)data.dptr) + 1;
3390
3391         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3392
3393         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3394                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3395         if (ret != 0 || res != 0) {
3396                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3397                 talloc_free(tmp_ctx);
3398                 return -1;
3399         }
3400         talloc_free(tmp_ctx);
3401         return 0;
3402 }
3403
3404 #define DB_VERSION 1
3405 #define MAX_DB_NAME 64
3406 struct db_file_header {
3407         unsigned long version;
3408         time_t timestamp;
3409         unsigned long persistent;
3410         unsigned long size;
3411         const char name[MAX_DB_NAME];
3412 };
3413
3414 struct backup_data {
3415         struct ctdb_marshall_buffer *records;
3416         uint32_t len;
3417         uint32_t total;
3418         bool traverse_error;
3419 };
3420
3421 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3422 {
3423         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3424         struct ctdb_rec_data *rec;
3425
3426         /* add the record */
3427         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3428         if (rec == NULL) {
3429                 bd->traverse_error = true;
3430                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3431                 return -1;
3432         }
3433         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3434         if (bd->records == NULL) {
3435                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3436                 bd->traverse_error = true;
3437                 return -1;
3438         }
3439         bd->records->count++;
3440         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3441         bd->len += rec->length;
3442         talloc_free(rec);
3443
3444         bd->total++;
3445         return 0;
3446 }
3447
3448 /*
3449  * backup a database to a file 
3450  */
3451 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3452 {
3453         int i, ret;
3454         struct ctdb_dbid_map *dbmap=NULL;
3455         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3456         struct db_file_header dbhdr;
3457         struct ctdb_db_context *ctdb_db;
3458         struct backup_data *bd;
3459         int fh = -1;
3460         int status = -1;
3461         const char *reason = NULL;
3462
3463         if (argc != 2) {
3464                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3465                 return -1;
3466         }
3467
3468         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3469         if (ret != 0) {
3470                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3471                 return ret;
3472         }
3473
3474         for(i=0;i<dbmap->num;i++){
3475                 const char *name;
3476
3477                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3478                 if(!strcmp(argv[0], name)){
3479                         talloc_free(discard_const(name));
3480                         break;
3481                 }
3482                 talloc_free(discard_const(name));
3483         }
3484         if (i == dbmap->num) {
3485                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3486                 talloc_free(tmp_ctx);
3487                 return -1;
3488         }
3489
3490         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3491                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3492         if (ret != 0) {
3493                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3494                                  argv[0]));
3495                 talloc_free(tmp_ctx);
3496                 return -1;
3497         }
3498         if (reason) {
3499                 uint32_t allow_unhealthy = 0;
3500
3501                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3502                                       "AllowUnhealthyDBRead",
3503                                       &allow_unhealthy);
3504
3505                 if (allow_unhealthy != 1) {
3506                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3507                                          argv[0], reason));
3508
3509                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3510                                          allow_unhealthy));
3511                         talloc_free(tmp_ctx);
3512                         return -1;
3513                 }
3514
3515                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3516                                      argv[0], argv[0]));
3517                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3518                                      "tunnable AllowUnhealthyDBRead = %u\n",
3519                                      allow_unhealthy));
3520         }
3521
3522         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3523         if (ctdb_db == NULL) {
3524                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3525                 talloc_free(tmp_ctx);
3526                 return -1;
3527         }
3528
3529
3530         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3531         if (ret == -1) {
3532                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3533                 talloc_free(tmp_ctx);
3534                 return -1;
3535         }
3536
3537
3538         bd = talloc_zero(tmp_ctx, struct backup_data);
3539         if (bd == NULL) {
3540                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3541                 talloc_free(tmp_ctx);
3542                 return -1;
3543         }
3544
3545         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3546         if (bd->records == NULL) {
3547                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3548                 talloc_free(tmp_ctx);
3549                 return -1;
3550         }
3551
3552         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3553         bd->records->db_id = ctdb_db->db_id;
3554         /* traverse the database collecting all records */
3555         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3556             bd->traverse_error) {
3557                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3558                 talloc_free(tmp_ctx);
3559                 return -1;              
3560         }
3561
3562         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3563
3564
3565         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
3566         if (fh == -1) {
3567                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
3568                 talloc_free(tmp_ctx);
3569                 return -1;
3570         }
3571
3572         dbhdr.version = DB_VERSION;
3573         dbhdr.timestamp = time(NULL);
3574         dbhdr.persistent = dbmap->dbs[i].persistent;
3575         dbhdr.size = bd->len;
3576         if (strlen(argv[0]) >= MAX_DB_NAME) {
3577                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
3578                 goto done;
3579         }
3580         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
3581         ret = write(fh, &dbhdr, sizeof(dbhdr));
3582         if (ret == -1) {
3583                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3584                 goto done;
3585         }
3586         ret = write(fh, bd->records, bd->len);
3587         if (ret == -1) {
3588                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
3589                 goto done;
3590         }
3591
3592         status = 0;
3593 done:
3594         if (fh != -1) {
3595                 ret = close(fh);
3596                 if (ret == -1) {
3597                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
3598                 }
3599         }
3600         talloc_free(tmp_ctx);
3601         return status;
3602 }
3603
3604 /*
3605  * restore a database from a file 
3606  */
3607 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
3608 {
3609         int ret;
3610         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3611         TDB_DATA outdata;
3612         TDB_DATA data;
3613         struct db_file_header dbhdr;
3614         struct ctdb_db_context *ctdb_db;
3615         struct ctdb_node_map *nodemap=NULL;
3616         struct ctdb_vnn_map *vnnmap=NULL;
3617         int i, fh;
3618         struct ctdb_control_wipe_database w;
3619         uint32_t *nodes;
3620         uint32_t generation;
3621         struct tm *tm;
3622         char tbuf[100];
3623         char *dbname;
3624
3625         if (argc < 1 || argc > 2) {
3626                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3627                 return -1;
3628         }
3629
3630         fh = open(argv[0], O_RDONLY);
3631         if (fh == -1) {
3632                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3633                 talloc_free(tmp_ctx);
3634                 return -1;
3635         }
3636
3637         read(fh, &dbhdr, sizeof(dbhdr));
3638         if (dbhdr.version != DB_VERSION) {
3639                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3640                 talloc_free(tmp_ctx);
3641                 return -1;
3642         }
3643
3644         dbname = dbhdr.name;
3645         if (argc == 2) {
3646                 dbname = argv[1];
3647         }
3648
3649         outdata.dsize = dbhdr.size;
3650         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3651         if (outdata.dptr == NULL) {
3652                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3653                 close(fh);
3654                 talloc_free(tmp_ctx);
3655                 return -1;
3656         }               
3657         read(fh, outdata.dptr, outdata.dsize);
3658         close(fh);
3659
3660         tm = localtime(&dbhdr.timestamp);
3661         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3662         printf("Restoring database '%s' from backup @ %s\n",
3663                 dbname, tbuf);
3664
3665
3666         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
3667         if (ctdb_db == NULL) {
3668                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
3669                 talloc_free(tmp_ctx);
3670                 return -1;
3671         }
3672
3673         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3674         if (ret != 0) {
3675                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3676                 talloc_free(tmp_ctx);
3677                 return ret;
3678         }
3679
3680
3681         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
3682         if (ret != 0) {
3683                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
3684                 talloc_free(tmp_ctx);
3685                 return ret;
3686         }
3687
3688         /* freeze all nodes */
3689         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3690         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3691                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3692                                         nodes, i,
3693                                         TIMELIMIT(),
3694                                         false, tdb_null,
3695                                         NULL, NULL,
3696                                         NULL) != 0) {
3697                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3698                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3699                         talloc_free(tmp_ctx);
3700                         return -1;
3701                 }
3702         }
3703
3704         generation = vnnmap->generation;
3705         data.dptr = (void *)&generation;
3706         data.dsize = sizeof(generation);
3707
3708         /* start a cluster wide transaction */
3709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3711                                         nodes, 0,
3712                                         TIMELIMIT(), false, data,
3713                                         NULL, NULL,
3714                                         NULL) != 0) {
3715                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
3716                 return -1;
3717         }
3718
3719
3720         w.db_id = ctdb_db->db_id;
3721         w.transaction_id = generation;
3722
3723         data.dptr = (void *)&w;
3724         data.dsize = sizeof(w);
3725
3726         /* wipe all the remote databases. */
3727         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3728         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3729                                         nodes, 0,
3730                                         TIMELIMIT(), false, data,
3731                                         NULL, NULL,
3732                                         NULL) != 0) {
3733                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3734                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3735                 talloc_free(tmp_ctx);
3736                 return -1;
3737         }
3738         
3739         /* push the database */
3740         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3741         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
3742                                         nodes, 0,
3743                                         TIMELIMIT(), false, outdata,
3744                                         NULL, NULL,
3745                                         NULL) != 0) {
3746                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
3747                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3748                 talloc_free(tmp_ctx);
3749                 return -1;
3750         }
3751
3752         data.dptr = (void *)&ctdb_db->db_id;
3753         data.dsize = sizeof(ctdb_db->db_id);
3754
3755         /* mark the database as healthy */
3756         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3757         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
3758                                         nodes, 0,
3759                                         TIMELIMIT(), false, data,
3760                                         NULL, NULL,
3761                                         NULL) != 0) {
3762                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
3763                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3764                 talloc_free(tmp_ctx);
3765                 return -1;
3766         }
3767
3768         data.dptr = (void *)&generation;
3769         data.dsize = sizeof(generation);
3770
3771         /* commit all the changes */
3772         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
3773                                         nodes, 0,
3774                                         TIMELIMIT(), false, data,
3775                                         NULL, NULL,
3776                                         NULL) != 0) {
3777                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
3778                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3779                 talloc_free(tmp_ctx);
3780                 return -1;
3781         }
3782
3783
3784         /* thaw all nodes */
3785         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3786         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
3787                                         nodes, 0,
3788                                         TIMELIMIT(),
3789                                         false, tdb_null,
3790                                         NULL, NULL,
3791                                         NULL) != 0) {
3792                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
3793                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3794                 talloc_free(tmp_ctx);
3795                 return -1;
3796         }
3797
3798
3799         talloc_free(tmp_ctx);
3800         return 0;
3801 }
3802
3803 /*
3804  * dump a database backup from a file
3805  */
3806 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
3807 {
3808         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3809         TDB_DATA outdata;
3810         struct db_file_header dbhdr;
3811         int i, fh;
3812         struct tm *tm;
3813         char tbuf[100];
3814         struct ctdb_rec_data *rec = NULL;
3815         struct ctdb_marshall_buffer *m;
3816
3817         if (argc != 1) {
3818                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3819                 return -1;
3820         }
3821
3822         fh = open(argv[0], O_RDONLY);
3823         if (fh == -1) {
3824                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
3825                 talloc_free(tmp_ctx);
3826                 return -1;
3827         }
3828
3829         read(fh, &dbhdr, sizeof(dbhdr));
3830         if (dbhdr.version != DB_VERSION) {
3831                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
3832                 talloc_free(tmp_ctx);
3833                 return -1;
3834         }
3835
3836         outdata.dsize = dbhdr.size;
3837         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
3838         if (outdata.dptr == NULL) {
3839                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
3840                 close(fh);
3841                 talloc_free(tmp_ctx);
3842                 return -1;
3843         }
3844         read(fh, outdata.dptr, outdata.dsize);
3845         close(fh);
3846         m = (struct ctdb_marshall_buffer *)outdata.dptr;
3847
3848         tm = localtime(&dbhdr.timestamp);
3849         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
3850         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
3851                 dbhdr.name, m->db_id, tbuf);
3852
3853         for (i=0; i < m->count; i++) {
3854                 uint32_t reqid = 0;
3855                 TDB_DATA key, data;
3856
3857                 /* we do not want the header splitted, so we pass NULL*/
3858                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
3859                                               NULL, &key, &data);
3860
3861                 ctdb_dumpdb_record(ctdb, key, data, stdout);
3862         }
3863
3864         printf("Dumped %d records\n", i);
3865         talloc_free(tmp_ctx);
3866         return 0;
3867 }
3868
3869 /*
3870  * wipe a database from a file
3871  */
3872 static int control_wipedb(struct ctdb_context *ctdb, int argc,
3873                           const char **argv)
3874 {
3875         int ret;
3876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3877         TDB_DATA data;
3878         struct ctdb_db_context *ctdb_db;
3879         struct ctdb_node_map *nodemap = NULL;
3880         struct ctdb_vnn_map *vnnmap = NULL;
3881         int i;
3882         struct ctdb_control_wipe_database w;
3883         uint32_t *nodes;
3884         uint32_t generation;
3885         struct ctdb_dbid_map *dbmap = NULL;
3886
3887         if (argc != 1) {
3888                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3889                 return -1;
3890         }
3891
3892         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3893                                  &dbmap);
3894         if (ret != 0) {
3895                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
3896                                   options.pnn));
3897                 return ret;
3898         }
3899
3900         for(i=0;i<dbmap->num;i++){
3901                 const char *name;
3902
3903                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3904                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
3905                 if(!strcmp(argv[0], name)){
3906                         talloc_free(discard_const(name));
3907                         break;
3908                 }
3909                 talloc_free(discard_const(name));
3910         }
3911         if (i == dbmap->num) {
3912                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
3913                                   argv[0]));
3914                 talloc_free(tmp_ctx);
3915                 return -1;
3916         }
3917
3918         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3919         if (ctdb_db == NULL) {
3920                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
3921                                   argv[0]));
3922                 talloc_free(tmp_ctx);
3923                 return -1;
3924         }
3925
3926         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
3927                                    &nodemap);
3928         if (ret != 0) {
3929                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
3930                                   options.pnn));
3931                 talloc_free(tmp_ctx);
3932                 return ret;
3933         }
3934
3935         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
3936                                   &vnnmap);
3937         if (ret != 0) {
3938                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3939                                   options.pnn));
3940                 talloc_free(tmp_ctx);
3941                 return ret;
3942         }
3943
3944         /* freeze all nodes */
3945         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3946         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
3947                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
3948                                                 nodes, i,
3949                                                 TIMELIMIT(),
3950                                                 false, tdb_null,
3951                                                 NULL, NULL,
3952                                                 NULL);
3953                 if (ret != 0) {
3954                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
3955                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
3956                                              CTDB_RECOVERY_ACTIVE);
3957                         talloc_free(tmp_ctx);
3958                         return -1;
3959                 }
3960         }
3961
3962         generation = vnnmap->generation;
3963         data.dptr = (void *)&generation;
3964         data.dsize = sizeof(generation);
3965
3966         /* start a cluster wide transaction */
3967         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3968         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
3969                                         nodes, 0,
3970                                         TIMELIMIT(), false, data,
3971                                         NULL, NULL,
3972                                         NULL);
3973         if (ret!= 0) {
3974                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
3975                                   "transactions.\n"));
3976                 return -1;
3977         }
3978
3979         w.db_id = ctdb_db->db_id;
3980         w.transaction_id = generation;
3981
3982         data.dptr = (void *)&w;
3983         data.dsize = sizeof(w);
3984
3985         /* wipe all the remote databases. */
3986         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
3987         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
3988                                         nodes, 0,
3989                                         TIMELIMIT(), false, data,
3990                                         NULL, NULL,
3991                                         NULL) != 0) {
3992                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
3993                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3994                 talloc_free(tmp_ctx);
3995                 return -1;
3996         }
3997
3998         data.dptr = (void *)&ctdb_db->db_id;
3999         data.dsize = sizeof(ctdb_db->db_id);
4000
4001         /* mark the database as healthy */
4002         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4003         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4004                                         nodes, 0,
4005                                         TIMELIMIT(), false, data,
4006                                         NULL, NULL,
4007                                         NULL) != 0) {
4008                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4009                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4010                 talloc_free(tmp_ctx);
4011                 return -1;
4012         }
4013
4014         data.dptr = (void *)&generation;
4015         data.dsize = sizeof(generation);
4016
4017         /* commit all the changes */
4018         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4019                                         nodes, 0,
4020                                         TIMELIMIT(), false, data,
4021                                         NULL, NULL,
4022                                         NULL) != 0) {
4023                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4024                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4025                 talloc_free(tmp_ctx);
4026                 return -1;
4027         }
4028
4029         /* thaw all nodes */
4030         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4031         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4032                                         nodes, 0,
4033                                         TIMELIMIT(),
4034                                         false, tdb_null,
4035                                         NULL, NULL,
4036                                         NULL) != 0) {
4037                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4038                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4039                 talloc_free(tmp_ctx);
4040                 return -1;
4041         }
4042
4043         talloc_free(tmp_ctx);
4044         return 0;
4045 }
4046
4047 /*
4048  * set flags of a node in the nodemap
4049  */
4050 static int control_setflags(struct ctdb_context *ctdb, int argc, const char **argv)
4051 {
4052         int ret;
4053         int32_t status;
4054         int node;
4055         int flags;
4056         TDB_DATA data;
4057         struct ctdb_node_flag_change c;
4058
4059         if (argc != 2) {
4060                 usage();
4061                 return -1;
4062         }
4063
4064         if (sscanf(argv[0], "%d", &node) != 1) {
4065                 DEBUG(DEBUG_ERR, ("Badly formed node\n"));
4066                 usage();
4067                 return -1;
4068         }
4069         if (sscanf(argv[1], "0x%x", &flags) != 1) {
4070                 DEBUG(DEBUG_ERR, ("Badly formed flags\n"));
4071                 usage();
4072                 return -1;
4073         }
4074
4075         c.pnn       = node;
4076         c.old_flags = 0;
4077         c.new_flags = flags;
4078
4079         data.dsize = sizeof(c);
4080         data.dptr = (unsigned char *)&c;
4081
4082         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_MODIFY_FLAGS, 0, 
4083                            data, NULL, NULL, &status, NULL, NULL);
4084         if (ret != 0 || status != 0) {
4085                 DEBUG(DEBUG_ERR,("Failed to modify flags\n"));
4086                 return -1;
4087         }
4088         return 0;
4089 }
4090
4091 /*
4092   dump memory usage
4093  */
4094 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4095 {
4096         TDB_DATA data;
4097         int ret;
4098         int32_t res;
4099         char *errmsg;
4100         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4101         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4102                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4103         if (ret != 0 || res != 0) {
4104                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4105                 talloc_free(tmp_ctx);
4106                 return -1;
4107         }
4108         write(1, data.dptr, data.dsize);
4109         talloc_free(tmp_ctx);
4110         return 0;
4111 }
4112
4113 /*
4114   handler for memory dumps
4115 */
4116 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4117                              TDB_DATA data, void *private_data)
4118 {
4119         write(1, data.dptr, data.dsize);
4120         exit(0);
4121 }
4122
4123 /*
4124   dump memory usage on the recovery daemon
4125  */
4126 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4127 {
4128         int ret;
4129         TDB_DATA data;
4130         struct rd_memdump_reply rd;
4131
4132         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4133         if (rd.pnn == -1) {
4134                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4135                 return -1;
4136         }
4137         rd.srvid = getpid();
4138
4139         /* register a message port for receiveing the reply so that we
4140            can receive the reply
4141         */
4142         ctdb_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4143
4144
4145         data.dptr = (uint8_t *)&rd;
4146         data.dsize = sizeof(rd);
4147
4148         ret = ctdb_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4149         if (ret != 0) {
4150                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4151                 return -1;
4152         }
4153
4154         /* this loop will terminate when we have received the reply */
4155         while (1) {     
4156                 event_loop_once(ctdb->ev);
4157         }
4158
4159         return 0;
4160 }
4161
4162 /*
4163   send a message to a srvid
4164  */
4165 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4166 {
4167         unsigned long srvid;
4168         int ret;
4169         TDB_DATA data;
4170
4171         if (argc < 2) {
4172                 usage();
4173         }
4174
4175         srvid      = strtoul(argv[0], NULL, 0);
4176
4177         data.dptr = (uint8_t *)discard_const(argv[1]);
4178         data.dsize= strlen(argv[1]);
4179
4180         ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4181         if (ret != 0) {
4182                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4183                 return -1;
4184         }
4185
4186         return 0;
4187 }
4188
4189 /*
4190   handler for msglisten
4191 */
4192 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4193                              TDB_DATA data, void *private_data)
4194 {
4195         int i;
4196
4197         printf("Message received: ");
4198         for (i=0;i<data.dsize;i++) {
4199                 printf("%c", data.dptr[i]);
4200         }
4201         printf("\n");
4202 }
4203
4204 /*
4205   listen for messages on a messageport
4206  */
4207 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4208 {
4209         uint64_t srvid;
4210
4211         srvid = getpid();
4212
4213         /* register a message port and listen for messages
4214         */
4215         ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4216         printf("Listening for messages on srvid:%d\n", (int)srvid);
4217
4218         while (1) {     
4219                 event_loop_once(ctdb->ev);
4220         }
4221
4222         return 0;
4223 }
4224
4225 /*
4226   list all nodes in the cluster
4227   if the daemon is running, we read the data from the daemon.
4228   if the daemon is not running we parse the nodes file directly
4229  */
4230 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4231 {
4232         int i, ret;
4233         struct ctdb_node_map *nodemap=NULL;
4234
4235         if (ctdb != NULL) {
4236                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4237                 if (ret != 0) {
4238                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4239                         return ret;
4240                 }
4241
4242                 for(i=0;i<nodemap->num;i++){
4243                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
4244                                 continue;
4245                         }
4246                         if (options.machinereadable){
4247                                 printf(":%d:%s:\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
4248                         } else {
4249                                 printf("%s\n", ctdb_addr_to_str(&nodemap->nodes[i].addr));
4250                         }
4251                 }
4252         } else {
4253                 TALLOC_CTX *mem_ctx = talloc_new(NULL);
4254                 struct pnn_node *pnn_nodes;
4255                 struct pnn_node *pnn_node;
4256         
4257                 pnn_nodes = read_nodes_file(mem_ctx);
4258                 if (pnn_nodes == NULL) {
4259                         DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4260                         talloc_free(mem_ctx);
4261                         return -1;
4262                 }
4263
4264                 for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4265                         ctdb_sock_addr addr;
4266
4267                         if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4268                                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4269                                 talloc_free(mem_ctx);
4270                                 return -1;
4271                         }
4272
4273                         if (options.machinereadable){
4274                                 printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4275                         } else {
4276                                 printf("%s\n", pnn_node->addr);
4277                         }
4278                 }
4279                 talloc_free(mem_ctx);
4280         }
4281
4282         return 0;
4283 }
4284
4285 /*
4286   reload the nodes file on the local node
4287  */
4288 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4289 {
4290         int i, ret;
4291         int mypnn;
4292         struct ctdb_node_map *nodemap=NULL;
4293
4294         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4295         if (mypnn == -1) {
4296                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4297                 return -1;
4298         }
4299
4300         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4301         if (ret != 0) {
4302                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4303                 return ret;
4304         }
4305
4306         /* reload the nodes file on all remote nodes */
4307         for (i=0;i<nodemap->num;i++) {
4308                 if (nodemap->nodes[i].pnn == mypnn) {
4309                         continue;
4310                 }
4311                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4312                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4313                         nodemap->nodes[i].pnn);
4314                 if (ret != 0) {
4315                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4316                 }
4317         }
4318
4319         /* reload the nodes file on the local node */
4320         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4321         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4322         if (ret != 0) {
4323                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4324         }
4325
4326         /* initiate a recovery */
4327         control_recover(ctdb, argc, argv);
4328
4329         return 0;
4330 }
4331
4332
4333 static const struct {
4334         const char *name;
4335         int (*fn)(struct ctdb_context *, int, const char **);
4336         bool auto_all;
4337         bool without_daemon; /* can be run without daemon running ? */
4338         const char *msg;
4339         const char *args;
4340 } ctdb_commands[] = {
4341 #ifdef CTDB_VERS
4342         { "version",         control_version,           true,   false,  "show version of ctdb" },
4343 #endif
4344         { "status",          control_status,            true,   false,  "show node status" },
4345         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4346         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4347         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4348         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4349         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4350         { "statistics",      control_statistics,        false,  false, "show statistics" },
4351         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4352         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4353         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4354         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4355         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4356         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4357         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4358         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4359         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4360         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4361         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4362         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4363         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4364         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4365         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4366         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4367         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4368         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4369         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4370         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4371         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname>" },
4372         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4373         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4374         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4375         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4376         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4377         { "stop",            control_stop,              true,   false,  "stop a node" },
4378         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4379         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4380         { "unban",           control_unban,             true,   false,  "unban a node" },
4381         { "showban",         control_showban,           true,   false,  "show ban information"},
4382         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4383         { "recover",         control_recover,           true,   false,  "force recovery" },
4384         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4385         { "freeze",          control_freeze,            true,   false,  "freeze databases", "[priority:1-3]" },
4386         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4387         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4388         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4389         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4390         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4391         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip>" },
4392
4393         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4394         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4395         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4396         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4397         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4398         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4399         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4400         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4401         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4402         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4403         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4404         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4405         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4406         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4407         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4408         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4409         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4410         { "setflags",        control_setflags,          false,  false, "set flags for a node in the nodemap.", "<node> <flags>"},
4411         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4412         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4413         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4414         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4415         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4416         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4417         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4418         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4419         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4420         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4421         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4422         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4423         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4424         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4425 };
4426
4427 /*
4428   show usage message
4429  */
4430 static void usage(void)
4431 {
4432         int i;
4433         printf(
4434 "Usage: ctdb [options] <control>\n" \
4435 "Options:\n" \
4436 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4437 "   -Y                 generate machinereadable output\n"
4438 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4439         printf("Controls:\n");
4440         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4441                 printf("  %-15s %-27s  %s\n", 
4442                        ctdb_commands[i].name, 
4443                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4444                        ctdb_commands[i].msg);
4445         }
4446         exit(1);
4447 }
4448
4449
4450 static void ctdb_alarm(int sig)
4451 {
4452         printf("Maximum runtime exceeded - exiting\n");
4453         _exit(ERR_TIMEOUT);
4454 }
4455
4456 /*
4457   main program
4458 */
4459 int main(int argc, const char *argv[])
4460 {
4461         struct ctdb_context *ctdb;
4462         char *nodestring = NULL;
4463         struct poptOption popt_options[] = {
4464                 POPT_AUTOHELP
4465                 POPT_CTDB_CMDLINE
4466                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4467                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4468                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4469                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4470                 POPT_TABLEEND
4471         };
4472         int opt;
4473         const char **extra_argv;
4474         int extra_argc = 0;
4475         int ret=-1, i;
4476         poptContext pc;
4477         struct event_context *ev;
4478         const char *control;
4479
4480         setlinebuf(stdout);
4481         
4482         /* set some defaults */
4483         options.maxruntime = 0;
4484         options.timelimit = 3;
4485         options.pnn = CTDB_CURRENT_NODE;
4486
4487         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4488
4489         while ((opt = poptGetNextOpt(pc)) != -1) {
4490                 switch (opt) {
4491                 default:
4492                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4493                                 poptBadOption(pc, 0), poptStrerror(opt)));
4494                         exit(1);
4495                 }
4496         }
4497
4498         /* setup the remaining options for the main program to use */
4499         extra_argv = poptGetArgs(pc);
4500         if (extra_argv) {
4501                 extra_argv++;
4502                 while (extra_argv[extra_argc]) extra_argc++;
4503         }
4504
4505         if (extra_argc < 1) {
4506                 usage();
4507         }
4508
4509         if (options.maxruntime == 0) {
4510                 const char *ctdb_timeout;
4511                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4512                 if (ctdb_timeout != NULL) {
4513                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4514                 } else {
4515                         /* default timeout is 120 seconds */
4516                         options.maxruntime = 120;
4517                 }
4518         }
4519
4520         signal(SIGALRM, ctdb_alarm);
4521         alarm(options.maxruntime);
4522
4523         /* setup the node number to contact */
4524         if (nodestring != NULL) {
4525                 if (strcmp(nodestring, "all") == 0) {
4526                         options.pnn = CTDB_BROADCAST_ALL;
4527                 } else {
4528                         options.pnn = strtoul(nodestring, NULL, 0);
4529                 }
4530         }
4531
4532         control = extra_argv[0];
4533
4534         ev = event_context_init(NULL);
4535
4536         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4537                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4538                         int j;
4539
4540                         if (ctdb_commands[i].without_daemon == true) {
4541                                 close(2);
4542                         }
4543
4544                         /* initialise ctdb */
4545                         ctdb = ctdb_cmdline_client(ev);
4546
4547                         if (ctdb_commands[i].without_daemon == false) {
4548                                 if (ctdb == NULL) {
4549                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4550                                         exit(1);
4551                                 }
4552
4553                                 /* verify the node exists */
4554                                 verify_node(ctdb);
4555
4556                                 if (options.pnn == CTDB_CURRENT_NODE) {
4557                                         int pnn;
4558                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4559                                         if (pnn == -1) {
4560                                                 return -1;
4561                                         }
4562                                         options.pnn = pnn;
4563                                 }
4564                         }
4565
4566                         if (ctdb_commands[i].auto_all && 
4567                             options.pnn == CTDB_BROADCAST_ALL) {
4568                                 uint32_t *nodes;
4569                                 uint32_t num_nodes;
4570                                 ret = 0;
4571
4572                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4573                                 CTDB_NO_MEMORY(ctdb, nodes);
4574         
4575                                 for (j=0;j<num_nodes;j++) {
4576                                         options.pnn = nodes[j];
4577                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4578                                 }
4579                                 talloc_free(nodes);
4580                         } else {
4581                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4582                         }
4583                         break;
4584                 }
4585         }
4586
4587         if (i == ARRAY_SIZE(ctdb_commands)) {
4588                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4589                 exit(1);
4590         }
4591
4592         return ret;
4593 }