tools/ctdb: allow "ctdb pfetch" only on persistent databases
[metze/ctdb/wip.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         int machinereadable;
47         int verbose;
48         int maxruntime;
49 } options;
50
51 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
52 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
53
54 #ifdef CTDB_VERS
55 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
56 {
57 #define STR(x) #x
58 #define XSTR(x) STR(x)
59         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
60         return 0;
61 }
62 #endif
63
64
65 /*
66   verify that a node exists and is reachable
67  */
68 static void verify_node(struct ctdb_context *ctdb)
69 {
70         int ret;
71         struct ctdb_node_map *nodemap=NULL;
72
73         if (options.pnn == CTDB_CURRENT_NODE) {
74                 return;
75         }
76         if (options.pnn == CTDB_BROADCAST_ALL) {
77                 return;
78         }
79
80         /* verify the node exists */
81         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
82                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
83                 exit(10);
84         }
85         if (options.pnn >= nodemap->num) {
86                 DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
87                 exit(ERR_NONODE);
88         }
89         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
90                 DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
91                 exit(ERR_DISNODE);
92         }
93         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
94                 DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
95                 exit(ERR_DISNODE);
96         }
97
98         /* verify we can access the node */
99         ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
100         if (ret == -1) {
101                 DEBUG(DEBUG_ERR,("Can not access node. Node is not operational.\n"));
102                 exit(10);
103         }
104 }
105
106 /*
107  check if a database exists
108 */
109 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
110 {
111         int i, ret;
112         struct ctdb_dbid_map *dbmap=NULL;
113
114         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
115         if (ret != 0) {
116                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
117                 return -1;
118         }
119
120         for(i=0;i<dbmap->num;i++){
121                 const char *name;
122
123                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
124                 if (!strcmp(name, db_name)) {
125                         if (persistent) {
126                                 *persistent = dbmap->dbs[i].persistent;
127                         }
128                         return 0;
129                 }
130         }
131
132         return -1;
133 }
134
135 /*
136   see if a process exists
137  */
138 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
139 {
140         uint32_t pnn, pid;
141         int ret;
142         if (argc < 1) {
143                 usage();
144         }
145
146         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
147                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
148                 return -1;
149         }
150
151         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
152         if (ret == 0) {
153                 printf("%u:%u exists\n", pnn, pid);
154         } else {
155                 printf("%u:%u does not exist\n", pnn, pid);
156         }
157         return ret;
158 }
159
160 /*
161   display statistics structure
162  */
163 static void show_statistics(struct ctdb_statistics *s, int show_header)
164 {
165         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
166         int i;
167         const char *prefix=NULL;
168         int preflen=0;
169         int tmp, days, hours, minutes, seconds;
170         const struct {
171                 const char *name;
172                 uint32_t offset;
173         } fields[] = {
174 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
175                 STATISTICS_FIELD(num_clients),
176                 STATISTICS_FIELD(frozen),
177                 STATISTICS_FIELD(recovering),
178                 STATISTICS_FIELD(num_recoveries),
179                 STATISTICS_FIELD(client_packets_sent),
180                 STATISTICS_FIELD(client_packets_recv),
181                 STATISTICS_FIELD(node_packets_sent),
182                 STATISTICS_FIELD(node_packets_recv),
183                 STATISTICS_FIELD(keepalive_packets_sent),
184                 STATISTICS_FIELD(keepalive_packets_recv),
185                 STATISTICS_FIELD(node.req_call),
186                 STATISTICS_FIELD(node.reply_call),
187                 STATISTICS_FIELD(node.req_dmaster),
188                 STATISTICS_FIELD(node.reply_dmaster),
189                 STATISTICS_FIELD(node.reply_error),
190                 STATISTICS_FIELD(node.req_message),
191                 STATISTICS_FIELD(node.req_control),
192                 STATISTICS_FIELD(node.reply_control),
193                 STATISTICS_FIELD(client.req_call),
194                 STATISTICS_FIELD(client.req_message),
195                 STATISTICS_FIELD(client.req_control),
196                 STATISTICS_FIELD(timeouts.call),
197                 STATISTICS_FIELD(timeouts.control),
198                 STATISTICS_FIELD(timeouts.traverse),
199                 STATISTICS_FIELD(total_calls),
200                 STATISTICS_FIELD(pending_calls),
201                 STATISTICS_FIELD(lockwait_calls),
202                 STATISTICS_FIELD(pending_lockwait_calls),
203                 STATISTICS_FIELD(childwrite_calls),
204                 STATISTICS_FIELD(pending_childwrite_calls),
205                 STATISTICS_FIELD(memory_used),
206                 STATISTICS_FIELD(max_hop_count),
207         };
208         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
209         seconds = tmp%60;
210         tmp    /= 60;
211         minutes = tmp%60;
212         tmp    /= 60;
213         hours   = tmp%24;
214         tmp    /= 24;
215         days    = tmp;
216
217         if (options.machinereadable){
218                 if (show_header) {
219                         printf("CTDB version:");
220                         printf("Current time of statistics:");
221                         printf("Statistics collected since:");
222                         for (i=0;i<ARRAY_SIZE(fields);i++) {
223                                 printf("%s:", fields[i].name);
224                         }
225                         printf("max_reclock_ctdbd:");
226                         printf("max_reclock_recd:");
227                         printf("max_call_latency:");
228                         printf("max_lockwait_latency:");
229                         printf("max_childwrite_latency:");
230                         printf("max_childwrite_latency:");
231                         printf("\n");
232                 }
233                 printf("%d:", CTDB_VERSION);
234                 printf("%d:", (int)s->statistics_current_time.tv_sec);
235                 printf("%d:", (int)s->statistics_start_time.tv_sec);
236                 for (i=0;i<ARRAY_SIZE(fields);i++) {
237                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
238                 }
239                 printf("%.6f:", s->reclock.ctdbd);
240                 printf("%.6f:", s->reclock.recd);
241                 printf("%.6f:", s->max_call_latency);
242                 printf("%.6f:", s->max_lockwait_latency);
243                 printf("%.6f:", s->max_childwrite_latency);
244                 printf("%.6f:", s->max_childwrite_latency);
245                 printf("\n");
246         } else {
247                 printf("CTDB version %u\n", CTDB_VERSION);
248                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
249                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
250
251                 for (i=0;i<ARRAY_SIZE(fields);i++) {
252                         if (strchr(fields[i].name, '.')) {
253                                 preflen = strcspn(fields[i].name, ".")+1;
254                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
255                                         prefix = fields[i].name;
256                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
257                                 }
258                         } else {
259                                 preflen = 0;
260                         }
261                         printf(" %*s%-22s%*s%10u\n", 
262                                preflen?4:0, "",
263                                fields[i].name+preflen, 
264                                preflen?0:4, "",
265                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
266                 }
267                 printf(" %-30s     %.6f sec\n", "max_reclock_ctdbd", s->reclock.ctdbd);
268                 printf(" %-30s     %.6f sec\n", "max_reclock_recd", s->reclock.recd);
269
270                 printf(" %-30s     %.6f sec\n", "max_call_latency", s->max_call_latency);
271                 printf(" %-30s     %.6f sec\n", "max_lockwait_latency", s->max_lockwait_latency);
272                 printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
273                 printf(" %-30s     %.6f sec\n", "max_childwrite_latency", s->max_childwrite_latency);
274         }
275
276         talloc_free(tmp_ctx);
277 }
278
279 /*
280   display remote ctdb statistics combined from all nodes
281  */
282 static int control_statistics_all(struct ctdb_context *ctdb)
283 {
284         int ret, i;
285         struct ctdb_statistics statistics;
286         uint32_t *nodes;
287         uint32_t num_nodes;
288
289         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
290         CTDB_NO_MEMORY(ctdb, nodes);
291         
292         ZERO_STRUCT(statistics);
293
294         for (i=0;i<num_nodes;i++) {
295                 struct ctdb_statistics s1;
296                 int j;
297                 uint32_t *v1 = (uint32_t *)&s1;
298                 uint32_t *v2 = (uint32_t *)&statistics;
299                 uint32_t num_ints = 
300                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
301                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
302                 if (ret != 0) {
303                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
304                         return ret;
305                 }
306                 for (j=0;j<num_ints;j++) {
307                         v2[j] += v1[j];
308                 }
309                 statistics.max_hop_count = 
310                         MAX(statistics.max_hop_count, s1.max_hop_count);
311                 statistics.max_call_latency = 
312                         MAX(statistics.max_call_latency, s1.max_call_latency);
313                 statistics.max_lockwait_latency = 
314                         MAX(statistics.max_lockwait_latency, s1.max_lockwait_latency);
315         }
316         talloc_free(nodes);
317         printf("Gathered statistics for %u nodes\n", num_nodes);
318         show_statistics(&statistics, 1);
319         return 0;
320 }
321
322 /*
323   display remote ctdb statistics
324  */
325 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
326 {
327         int ret;
328         struct ctdb_statistics statistics;
329
330         if (options.pnn == CTDB_BROADCAST_ALL) {
331                 return control_statistics_all(ctdb);
332         }
333
334         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
335         if (ret != 0) {
336                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
337                 return ret;
338         }
339         show_statistics(&statistics, 1);
340         return 0;
341 }
342
343
344 /*
345   reset remote ctdb statistics
346  */
347 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
348 {
349         int ret;
350
351         ret = ctdb_statistics_reset(ctdb, options.pnn);
352         if (ret != 0) {
353                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
354                 return ret;
355         }
356         return 0;
357 }
358
359
360 /*
361   display remote ctdb rolling statistics
362  */
363 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
364 {
365         int ret;
366         struct ctdb_statistics_wire *stats;
367         int i, num_records = -1;
368
369         if (argc ==1) {
370                 num_records = atoi(argv[0]) - 1;
371         }
372
373         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
374         if (ret != 0) {
375                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
376                 return ret;
377         }
378         for (i=0;i<stats->num;i++) {
379                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
380                         continue;
381                 }
382                 show_statistics(&stats->stats[i], i==0);
383                 if (i == num_records) {
384                         break;
385                 }
386         }
387         return 0;
388 }
389
390
391 /*
392   display uptime of remote node
393  */
394 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
395 {
396         int ret;
397         struct ctdb_uptime *uptime = NULL;
398         int tmp, days, hours, minutes, seconds;
399
400         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
401         if (ret != 0) {
402                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
403                 return ret;
404         }
405
406         if (options.machinereadable){
407                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
408                 printf(":%u:%u:%u:%lf\n",
409                         (unsigned int)uptime->current_time.tv_sec,
410                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
411                         (unsigned int)uptime->last_recovery_finished.tv_sec,
412                         timeval_delta(&uptime->last_recovery_finished,
413                                       &uptime->last_recovery_started)
414                 );
415                 return 0;
416         }
417
418         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
419
420         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
421         seconds = tmp%60;
422         tmp    /= 60;
423         minutes = tmp%60;
424         tmp    /= 60;
425         hours   = tmp%24;
426         tmp    /= 24;
427         days    = tmp;
428         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
429
430         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
431         seconds = tmp%60;
432         tmp    /= 60;
433         minutes = tmp%60;
434         tmp    /= 60;
435         hours   = tmp%24;
436         tmp    /= 24;
437         days    = tmp;
438         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
439         
440         printf("Duration of last recovery/failover: %lf seconds\n",
441                 timeval_delta(&uptime->last_recovery_finished,
442                               &uptime->last_recovery_started));
443
444         return 0;
445 }
446
447 /*
448   show the PNN of the current node
449  */
450 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
451 {
452         uint32_t mypnn;
453         bool ret;
454
455         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
456         if (!ret) {
457                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
458                 return -1;
459         }
460
461         printf("PNN:%d\n", mypnn);
462         return 0;
463 }
464
465
466 struct pnn_node {
467         struct pnn_node *next;
468         const char *addr;
469         int pnn;
470 };
471
472 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
473 {
474         const char *nodes_list;
475         int nlines;
476         char **lines;
477         int i, pnn;
478         struct pnn_node *pnn_nodes = NULL;
479         struct pnn_node *pnn_node;
480         struct pnn_node *tmp_node;
481
482         /* read the nodes file */
483         nodes_list = getenv("CTDB_NODES");
484         if (nodes_list == NULL) {
485                 nodes_list = "/etc/ctdb/nodes";
486         }
487         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
488         if (lines == NULL) {
489                 return NULL;
490         }
491         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
492                 nlines--;
493         }
494         for (i=0, pnn=0; i<nlines; i++) {
495                 char *node;
496
497                 node = lines[i];
498                 /* strip leading spaces */
499                 while((*node == ' ') || (*node == '\t')) {
500                         node++;
501                 }
502                 if (*node == '#') {
503                         pnn++;
504                         continue;
505                 }
506                 if (strcmp(node, "") == 0) {
507                         continue;
508                 }
509                 pnn_node = talloc(mem_ctx, struct pnn_node);
510                 pnn_node->pnn = pnn++;
511                 pnn_node->addr = talloc_strdup(pnn_node, node);
512                 pnn_node->next = pnn_nodes;
513                 pnn_nodes = pnn_node;
514         }
515
516         /* swap them around so we return them in incrementing order */
517         pnn_node = pnn_nodes;
518         pnn_nodes = NULL;
519         while (pnn_node) {
520                 tmp_node = pnn_node;
521                 pnn_node = pnn_node->next;
522
523                 tmp_node->next = pnn_nodes;
524                 pnn_nodes = tmp_node;
525         }
526
527         return pnn_nodes;
528 }
529
530 /*
531   show the PNN of the current node
532   discover the pnn by loading the nodes file and try to bind to all
533   addresses one at a time until the ip address is found.
534  */
535 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
536 {
537         TALLOC_CTX *mem_ctx = talloc_new(NULL);
538         struct pnn_node *pnn_nodes;
539         struct pnn_node *pnn_node;
540
541         pnn_nodes = read_nodes_file(mem_ctx);
542         if (pnn_nodes == NULL) {
543                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
544                 talloc_free(mem_ctx);
545                 return -1;
546         }
547
548         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
549                 ctdb_sock_addr addr;
550
551                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
552                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
553                         talloc_free(mem_ctx);
554                         return -1;
555                 }
556
557                 if (ctdb_sys_have_ip(&addr)) {
558                         printf("PNN:%d\n", pnn_node->pnn);
559                         talloc_free(mem_ctx);
560                         return 0;
561                 }
562         }
563
564         printf("Failed to detect which PNN this node is\n");
565         talloc_free(mem_ctx);
566         return -1;
567 }
568
569 /*
570   display remote ctdb status
571  */
572 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
573 {
574         int i, ret;
575         struct ctdb_vnn_map *vnnmap=NULL;
576         struct ctdb_node_map *nodemap=NULL;
577         uint32_t recmode, recmaster;
578         int mypnn;
579
580         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
581         if (mypnn == -1) {
582                 return -1;
583         }
584
585         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
586         if (ret != 0) {
587                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
588                 return ret;
589         }
590
591         if(options.machinereadable){
592                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
593                 for(i=0;i<nodemap->num;i++){
594                         int partially_online = 0;
595                         int j;
596
597                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
598                                 continue;
599                         }
600                         if (nodemap->nodes[i].flags == 0) {
601                                 struct ctdb_control_get_ifaces *ifaces;
602
603                                 ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
604                                                            nodemap->nodes[i].pnn,
605                                                            ctdb, &ifaces);
606                                 if (ret == 0) {
607                                         for (j=0; j < ifaces->num; j++) {
608                                                 if (ifaces->ifaces[j].link_state != 0) {
609                                                         continue;
610                                                 }
611                                                 partially_online = 1;
612                                                 break;
613                                         }
614                                         talloc_free(ifaces);
615                                 }
616                         }
617                         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
618                                 ctdb_addr_to_str(&nodemap->nodes[i].addr),
619                                !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
620                                !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
621                                !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
622                                !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
623                                !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
624                                !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
625                                partially_online);
626                 }
627                 return 0;
628         }
629
630         printf("Number of nodes:%d\n", nodemap->num);
631         for(i=0;i<nodemap->num;i++){
632                 static const struct {
633                         uint32_t flag;
634                         const char *name;
635                 } flag_names[] = {
636                         { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
637                         { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
638                         { NODE_FLAGS_BANNED,                "BANNED" },
639                         { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
640                         { NODE_FLAGS_DELETED,               "DELETED" },
641                         { NODE_FLAGS_STOPPED,               "STOPPED" },
642                         { NODE_FLAGS_INACTIVE,              "INACTIVE" },
643                 };
644                 char *flags_str = NULL;
645                 int j;
646
647                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
648                         continue;
649                 }
650                 if (nodemap->nodes[i].flags == 0) {
651                         struct ctdb_control_get_ifaces *ifaces;
652
653                         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
654                                                    nodemap->nodes[i].pnn,
655                                                    ctdb, &ifaces);
656                         if (ret == 0) {
657                                 for (j=0; j < ifaces->num; j++) {
658                                         if (ifaces->ifaces[j].link_state != 0) {
659                                                 continue;
660                                         }
661                                         flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
662                                         break;
663                                 }
664                                 talloc_free(ifaces);
665                         }
666                 }
667                 for (j=0;j<ARRAY_SIZE(flag_names);j++) {
668                         if (nodemap->nodes[i].flags & flag_names[j].flag) {
669                                 if (flags_str == NULL) {
670                                         flags_str = talloc_strdup(ctdb, flag_names[j].name);
671                                 } else {
672                                         flags_str = talloc_asprintf_append(flags_str, "|%s",
673                                                                            flag_names[j].name);
674                                 }
675                                 CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
676                         }
677                 }
678                 if (flags_str == NULL) {
679                         flags_str = talloc_strdup(ctdb, "OK");
680                         CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
681                 }
682                 printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
683                        ctdb_addr_to_str(&nodemap->nodes[i].addr),
684                        flags_str,
685                        nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
686                 talloc_free(flags_str);
687         }
688
689         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
690         if (ret != 0) {
691                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
692                 return ret;
693         }
694         if (vnnmap->generation == INVALID_GENERATION) {
695                 printf("Generation:INVALID\n");
696         } else {
697                 printf("Generation:%d\n",vnnmap->generation);
698         }
699         printf("Size:%d\n",vnnmap->size);
700         for(i=0;i<vnnmap->size;i++){
701                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
702         }
703
704         ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmode);
705         if (ret != 0) {
706                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
707                 return ret;
708         }
709         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
710
711         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
712         if (ret != 0) {
713                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
714                 return ret;
715         }
716         printf("Recovery master:%d\n",recmaster);
717
718         return 0;
719 }
720
721
722 struct natgw_node {
723         struct natgw_node *next;
724         const char *addr;
725 };
726
727 /*
728   display the list of nodes belonging to this natgw configuration
729  */
730 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
731 {
732         int i, ret;
733         uint32_t capabilities;
734         const char *natgw_list;
735         int nlines;
736         char **lines;
737         struct natgw_node *natgw_nodes = NULL;
738         struct natgw_node *natgw_node;
739         struct ctdb_node_map *nodemap=NULL;
740
741
742         /* read the natgw nodes file into a linked list */
743         natgw_list = getenv("NATGW_NODES");
744         if (natgw_list == NULL) {
745                 natgw_list = "/etc/ctdb/natgw_nodes";
746         }
747         lines = file_lines_load(natgw_list, &nlines, ctdb);
748         if (lines == NULL) {
749                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
750                 return -1;
751         }
752         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
753                 nlines--;
754         }
755         for (i=0;i<nlines;i++) {
756                 char *node;
757
758                 node = lines[i];
759                 /* strip leading spaces */
760                 while((*node == ' ') || (*node == '\t')) {
761                         node++;
762                 }
763                 if (*node == '#') {
764                         continue;
765                 }
766                 if (strcmp(node, "") == 0) {
767                         continue;
768                 }
769                 natgw_node = talloc(ctdb, struct natgw_node);
770                 natgw_node->addr = talloc_strdup(natgw_node, node);
771                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
772                 natgw_node->next = natgw_nodes;
773                 natgw_nodes = natgw_node;
774         }
775
776         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
777         if (ret != 0) {
778                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
779                 return ret;
780         }
781
782         i=0;
783         while(i<nodemap->num) {
784                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
785                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
786                                 break;
787                         }
788                 }
789
790                 /* this node was not in the natgw so we just remove it from
791                  * the list
792                  */
793                 if ((natgw_node == NULL) 
794                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
795                         int j;
796
797                         for (j=i+1; j<nodemap->num; j++) {
798                                 nodemap->nodes[j-1] = nodemap->nodes[j];
799                         }
800                         nodemap->num--;
801                         continue;
802                 }
803
804                 i++;
805         }               
806
807         /* pick a node to be natgwmaster
808          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
809          */
810         for(i=0;i<nodemap->num;i++){
811                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
812                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
813                         if (ret != 0) {
814                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
815                                 return ret;
816                         }
817                         if (!(capabilities&CTDB_CAP_NATGW)) {
818                                 continue;
819                         }
820                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
821                         break;
822                 }
823         }
824         /* we couldnt find any healthy node, try unhealthy ones */
825         if (i == nodemap->num) {
826                 for(i=0;i<nodemap->num;i++){
827                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
828                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
829                                 if (ret != 0) {
830                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
831                                         return ret;
832                                 }
833                                 if (!(capabilities&CTDB_CAP_NATGW)) {
834                                         continue;
835                                 }
836                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
837                                 break;
838                         }
839                 }
840         }
841         /* unless all nodes are STOPPED, when we pick one anyway */
842         if (i == nodemap->num) {
843                 for(i=0;i<nodemap->num;i++){
844                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
845                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
846                                 if (ret != 0) {
847                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
848                                         return ret;
849                                 }
850                                 if (!(capabilities&CTDB_CAP_NATGW)) {
851                                         continue;
852                                 }
853                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
854                                 break;
855                         }
856                 }
857                 /* or if we still can not find any */
858                 if (i == nodemap->num) {
859                         printf("-1 0.0.0.0\n");
860                 }
861         }
862
863         /* print the pruned list of nodes belonging to this natgw list */
864         for(i=0;i<nodemap->num;i++){
865                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
866                         continue;
867                 }
868                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
869                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
870                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
871                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
872                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
873                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
874                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
875         }
876
877         return 0;
878 }
879
880 /*
881   display the status of the scripts for monitoring (or other events)
882  */
883 static int control_one_scriptstatus(struct ctdb_context *ctdb,
884                                     enum ctdb_eventscript_call type)
885 {
886         struct ctdb_scripts_wire *script_status;
887         int ret, i;
888
889         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
890         if (ret != 0) {
891                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
892                 return ret;
893         }
894
895         if (script_status == NULL) {
896                 if (!options.machinereadable) {
897                         printf("%s cycle never run\n",
898                                ctdb_eventscript_call_names[type]);
899                 }
900                 return 0;
901         }
902
903         if (!options.machinereadable) {
904                 printf("%d scripts were executed last %s cycle\n",
905                        script_status->num_scripts,
906                        ctdb_eventscript_call_names[type]);
907         }
908         for (i=0; i<script_status->num_scripts; i++) {
909                 const char *status = NULL;
910
911                 switch (script_status->scripts[i].status) {
912                 case -ETIME:
913                         status = "TIMEDOUT";
914                         break;
915                 case -ENOEXEC:
916                         status = "DISABLED";
917                         break;
918                 case 0:
919                         status = "OK";
920                         break;
921                 default:
922                         if (script_status->scripts[i].status > 0)
923                                 status = "ERROR";
924                         break;
925                 }
926                 if (options.machinereadable) {
927                         printf("%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
928                                ctdb_eventscript_call_names[type],
929                                script_status->scripts[i].name,
930                                script_status->scripts[i].status,
931                                status,
932                                (long)script_status->scripts[i].start.tv_sec,
933                                (long)script_status->scripts[i].start.tv_usec,
934                                (long)script_status->scripts[i].finished.tv_sec,
935                                (long)script_status->scripts[i].finished.tv_usec,
936                                script_status->scripts[i].output);
937                         continue;
938                 }
939                 if (status)
940                         printf("%-20s Status:%s    ",
941                                script_status->scripts[i].name, status);
942                 else
943                         /* Some other error, eg from stat. */
944                         printf("%-20s Status:CANNOT RUN (%s)",
945                                script_status->scripts[i].name,
946                                strerror(-script_status->scripts[i].status));
947
948                 if (script_status->scripts[i].status >= 0) {
949                         printf("Duration:%.3lf ",
950                         timeval_delta(&script_status->scripts[i].finished,
951                               &script_status->scripts[i].start));
952                 }
953                 if (script_status->scripts[i].status != -ENOEXEC) {
954                         printf("%s",
955                                ctime(&script_status->scripts[i].start.tv_sec));
956                         if (script_status->scripts[i].status != 0) {
957                                 printf("   OUTPUT:%s\n",
958                                        script_status->scripts[i].output);
959                         }
960                 } else {
961                         printf("\n");
962                 }
963         }
964         return 0;
965 }
966
967
968 static int control_scriptstatus(struct ctdb_context *ctdb,
969                                 int argc, const char **argv)
970 {
971         int ret;
972         enum ctdb_eventscript_call type, min, max;
973         const char *arg;
974
975         if (argc > 1) {
976                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
977                 return -1;
978         }
979
980         if (argc == 0)
981                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
982         else
983                 arg = argv[0];
984
985         for (type = 0; type < CTDB_EVENT_MAX; type++) {
986                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
987                         min = type;
988                         max = type+1;
989                         break;
990                 }
991         }
992         if (type == CTDB_EVENT_MAX) {
993                 if (strcmp(arg, "all") == 0) {
994                         min = 0;
995                         max = CTDB_EVENT_MAX;
996                 } else {
997                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
998                         return -1;
999                 }
1000         }
1001
1002         if (options.machinereadable) {
1003                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1004         }
1005
1006         for (type = min; type < max; type++) {
1007                 ret = control_one_scriptstatus(ctdb, type);
1008                 if (ret != 0) {
1009                         return ret;
1010                 }
1011         }
1012
1013         return 0;
1014 }
1015
1016 /*
1017   enable an eventscript
1018  */
1019 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1020 {
1021         int ret;
1022
1023         if (argc < 1) {
1024                 usage();
1025         }
1026
1027         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1028         if (ret != 0) {
1029           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1030                 return ret;
1031         }
1032
1033         return 0;
1034 }
1035
1036 /*
1037   disable an eventscript
1038  */
1039 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1040 {
1041         int ret;
1042
1043         if (argc < 1) {
1044                 usage();
1045         }
1046
1047         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1048         if (ret != 0) {
1049           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1050                 return ret;
1051         }
1052
1053         return 0;
1054 }
1055
1056 /*
1057   display the pnn of the recovery master
1058  */
1059 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1060 {
1061         int ret;
1062         uint32_t recmaster;
1063
1064         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1065         if (ret != 0) {
1066                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1067                 return ret;
1068         }
1069         printf("%d\n",recmaster);
1070
1071         return 0;
1072 }
1073
1074 /*
1075   add a tickle to a public address
1076  */
1077 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1078 {
1079         struct ctdb_tcp_connection t;
1080         TDB_DATA data;
1081         int ret;
1082
1083         if (argc < 2) {
1084                 usage();
1085         }
1086
1087         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1088                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1089                 return -1;
1090         }
1091         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1092                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1093                 return -1;
1094         }
1095
1096         data.dptr = (uint8_t *)&t;
1097         data.dsize = sizeof(t);
1098
1099         /* tell all nodes about this tcp connection */
1100         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1101                            0, data, ctdb, NULL, NULL, NULL, NULL);
1102         if (ret != 0) {
1103                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1104                 return -1;
1105         }
1106         
1107         return 0;
1108 }
1109
1110
1111 /*
1112   delete a tickle from a node
1113  */
1114 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1115 {
1116         struct ctdb_tcp_connection t;
1117         TDB_DATA data;
1118         int ret;
1119
1120         if (argc < 2) {
1121                 usage();
1122         }
1123
1124         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1125                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1126                 return -1;
1127         }
1128         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1129                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1130                 return -1;
1131         }
1132
1133         data.dptr = (uint8_t *)&t;
1134         data.dsize = sizeof(t);
1135
1136         /* tell all nodes about this tcp connection */
1137         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1138                            0, data, ctdb, NULL, NULL, NULL, NULL);
1139         if (ret != 0) {
1140                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1141                 return -1;
1142         }
1143         
1144         return 0;
1145 }
1146
1147
1148 /*
1149   get a list of all tickles for this pnn
1150  */
1151 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1152 {
1153         struct ctdb_control_tcp_tickle_list *list;
1154         ctdb_sock_addr addr;
1155         int i, ret;
1156         unsigned port = 0;
1157
1158         if (argc < 1) {
1159                 usage();
1160         }
1161
1162         if (argc == 2) {
1163                 port = atoi(argv[1]);
1164         }
1165
1166         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1167                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1168                 return -1;
1169         }
1170
1171         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1172         if (ret == -1) {
1173                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1174                 return -1;
1175         }
1176
1177         if (options.machinereadable){
1178                 printf(":source ip:port:destination ip:port:\n");
1179                 for (i=0;i<list->tickles.num;i++) {
1180                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1181                                 continue;
1182                         }
1183                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1184                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1185                 }
1186         } else {
1187                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1188                 printf("Num tickles:%u\n", list->tickles.num);
1189                 for (i=0;i<list->tickles.num;i++) {
1190                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1191                                 continue;
1192                         }
1193                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1194                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1195                 }
1196         }
1197
1198         talloc_free(list);
1199         
1200         return 0;
1201 }
1202
1203
1204 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1205 {
1206         struct ctdb_all_public_ips *ips;
1207         struct ctdb_public_ip ip;
1208         int i, ret;
1209         uint32_t *nodes;
1210         uint32_t disable_time;
1211         TDB_DATA data;
1212         struct ctdb_node_map *nodemap=NULL;
1213         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1214
1215         disable_time = 30;
1216         data.dptr  = (uint8_t*)&disable_time;
1217         data.dsize = sizeof(disable_time);
1218         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1219         if (ret != 0) {
1220                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1221                 return -1;
1222         }
1223
1224
1225
1226         /* read the public ip list from the node */
1227         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1228         if (ret != 0) {
1229                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1230                 talloc_free(tmp_ctx);
1231                 return -1;
1232         }
1233
1234         for (i=0;i<ips->num;i++) {
1235                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1236                         break;
1237                 }
1238         }
1239         if (i==ips->num) {
1240                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1241                         pnn, ctdb_addr_to_str(addr)));
1242                 talloc_free(tmp_ctx);
1243                 return -1;
1244         }
1245
1246         ip.pnn  = pnn;
1247         ip.addr = *addr;
1248
1249         data.dptr  = (uint8_t *)&ip;
1250         data.dsize = sizeof(ip);
1251
1252         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1253         if (ret != 0) {
1254                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1255                 talloc_free(tmp_ctx);
1256                 return ret;
1257         }
1258
1259         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1260         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1261                                         nodes, 0,
1262                                         LONGTIMELIMIT(),
1263                                         false, data,
1264                                         NULL, NULL,
1265                                         NULL);
1266         if (ret != 0) {
1267                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1268                 talloc_free(tmp_ctx);
1269                 return -1;
1270         }
1271
1272         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1273         if (ret != 0) {
1274                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1275                 talloc_free(tmp_ctx);
1276                 return -1;
1277         }
1278
1279         /* update the recovery daemon so it now knows to expect the new
1280            node assignment for this ip.
1281         */
1282         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1285                 return -1;
1286         }
1287
1288         talloc_free(tmp_ctx);
1289         return 0;
1290 }
1291
1292 /*
1293   move/failover an ip address to a specific node
1294  */
1295 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1296 {
1297         uint32_t pnn;
1298         ctdb_sock_addr addr;
1299
1300         if (argc < 2) {
1301                 usage();
1302                 return -1;
1303         }
1304
1305         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1306                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1307                 return -1;
1308         }
1309
1310
1311         if (sscanf(argv[1], "%u", &pnn) != 1) {
1312                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1313                 return -1;
1314         }
1315
1316         if (move_ip(ctdb, &addr, pnn) != 0) {
1317                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1318                 return -1;
1319         }
1320
1321         return 0;
1322 }
1323
1324 void getips_store_callback(void *param, void *data)
1325 {
1326         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1327         struct ctdb_all_public_ips *ips = param;
1328         int i;
1329
1330         i = ips->num++;
1331         ips->ips[i].pnn  = node_ip->pnn;
1332         ips->ips[i].addr = node_ip->addr;
1333 }
1334
1335 void getips_count_callback(void *param, void *data)
1336 {
1337         uint32_t *count = param;
1338
1339         (*count)++;
1340 }
1341
1342 #define IP_KEYLEN       4
1343 static uint32_t *ip_key(ctdb_sock_addr *ip)
1344 {
1345         static uint32_t key[IP_KEYLEN];
1346
1347         bzero(key, sizeof(key));
1348
1349         switch (ip->sa.sa_family) {
1350         case AF_INET:
1351                 key[0]  = ip->ip.sin_addr.s_addr;
1352                 break;
1353         case AF_INET6:
1354                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1355                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1356                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1357                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1358                 break;
1359         default:
1360                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1361                 return key;
1362         }
1363
1364         return key;
1365 }
1366
1367 static void *add_ip_callback(void *parm, void *data)
1368 {
1369         return parm;
1370 }
1371
1372 static int
1373 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1374 {
1375         struct ctdb_all_public_ips *tmp_ips;
1376         struct ctdb_node_map *nodemap=NULL;
1377         trbt_tree_t *ip_tree;
1378         int i, j, len, ret;
1379         uint32_t count;
1380
1381         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1382         if (ret != 0) {
1383                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1384                 return ret;
1385         }
1386
1387         ip_tree = trbt_create(tmp_ctx, 0);
1388
1389         for(i=0;i<nodemap->num;i++){
1390                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1391                         continue;
1392                 }
1393                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1394                         continue;
1395                 }
1396
1397                 /* read the public ip list from this node */
1398                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1399                 if (ret != 0) {
1400                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1401                         return -1;
1402                 }
1403         
1404                 for (j=0; j<tmp_ips->num;j++) {
1405                         struct ctdb_public_ip *node_ip;
1406
1407                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1408                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1409                         node_ip->addr = tmp_ips->ips[j].addr;
1410
1411                         trbt_insertarray32_callback(ip_tree,
1412                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1413                                 add_ip_callback,
1414                                 node_ip);
1415                 }
1416                 talloc_free(tmp_ips);
1417         }
1418
1419         /* traverse */
1420         count = 0;
1421         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1422
1423         len = offsetof(struct ctdb_all_public_ips, ips) + 
1424                 count*sizeof(struct ctdb_public_ip);
1425         tmp_ips = talloc_zero_size(tmp_ctx, len);
1426         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1427
1428         *ips = tmp_ips;
1429
1430         return 0;
1431 }
1432
1433
1434 /* 
1435  * scans all other nodes and returns a pnn for another node that can host this 
1436  * ip address or -1
1437  */
1438 static int
1439 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1440 {
1441         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1442         struct ctdb_all_public_ips *ips;
1443         struct ctdb_node_map *nodemap=NULL;
1444         int i, j, ret;
1445
1446         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1447         if (ret != 0) {
1448                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1449                 talloc_free(tmp_ctx);
1450                 return ret;
1451         }
1452
1453         for(i=0;i<nodemap->num;i++){
1454                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1455                         continue;
1456                 }
1457                 if (nodemap->nodes[i].pnn == options.pnn) {
1458                         continue;
1459                 }
1460
1461                 /* read the public ip list from this node */
1462                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1463                 if (ret != 0) {
1464                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1465                         return -1;
1466                 }
1467
1468                 for (j=0;j<ips->num;j++) {
1469                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1470                                 talloc_free(tmp_ctx);
1471                                 return nodemap->nodes[i].pnn;
1472                         }
1473                 }
1474                 talloc_free(ips);
1475         }
1476
1477         talloc_free(tmp_ctx);
1478         return -1;
1479 }
1480
1481 /*
1482   add a public ip address to a node
1483  */
1484 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1485 {
1486         int i, ret;
1487         int len;
1488         uint32_t pnn;
1489         unsigned mask;
1490         ctdb_sock_addr addr;
1491         struct ctdb_control_ip_iface *pub;
1492         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1493         struct ctdb_all_public_ips *ips;
1494
1495
1496         if (argc != 2) {
1497                 talloc_free(tmp_ctx);
1498                 usage();
1499         }
1500
1501         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1502                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1503                 talloc_free(tmp_ctx);
1504                 return -1;
1505         }
1506
1507         ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1508         if (ret != 0) {
1509                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1510                 talloc_free(tmp_ctx);
1511                 return ret;
1512         }
1513
1514
1515         /* check if some other node is already serving this ip, if not,
1516          * we will claim it
1517          */
1518         for (i=0;i<ips->num;i++) {
1519                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1520                         break;
1521                 }
1522         }
1523
1524         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1525         pub = talloc_size(tmp_ctx, len); 
1526         CTDB_NO_MEMORY(ctdb, pub);
1527
1528         pub->addr  = addr;
1529         pub->mask  = mask;
1530         pub->len   = strlen(argv[1])+1;
1531         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1532
1533         ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1534         if (ret != 0) {
1535                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
1536                 talloc_free(tmp_ctx);
1537                 return ret;
1538         }
1539
1540         if (i == ips->num) {
1541                 /* no one has this ip so we claim it */
1542                 pnn  = options.pnn;
1543         } else {
1544                 pnn  = ips->ips[i].pnn;
1545         }
1546
1547         if (move_ip(ctdb, &addr, pnn) != 0) {
1548                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", pnn));
1549                 return -1;
1550         }
1551
1552         talloc_free(tmp_ctx);
1553         return 0;
1554 }
1555
1556 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1557
1558 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1559 {
1560         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1561         struct ctdb_node_map *nodemap=NULL;
1562         struct ctdb_all_public_ips *ips;
1563         int ret, i, j;
1564
1565         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1566         if (ret != 0) {
1567                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1568                 return ret;
1569         }
1570
1571         /* remove it from the nodes that are not hosting the ip currently */
1572         for(i=0;i<nodemap->num;i++){
1573                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1574                         continue;
1575                 }
1576                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1577                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1578                         continue;
1579                 }
1580
1581                 for (j=0;j<ips->num;j++) {
1582                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1583                                 break;
1584                         }
1585                 }
1586                 if (j==ips->num) {
1587                         continue;
1588                 }
1589
1590                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1591                         continue;
1592                 }
1593
1594                 options.pnn = nodemap->nodes[i].pnn;
1595                 control_delip(ctdb, argc, argv);
1596         }
1597
1598
1599         /* remove it from every node (also the one hosting it) */
1600         for(i=0;i<nodemap->num;i++){
1601                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1602                         continue;
1603                 }
1604                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1605                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1606                         continue;
1607                 }
1608
1609                 for (j=0;j<ips->num;j++) {
1610                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1611                                 break;
1612                         }
1613                 }
1614                 if (j==ips->num) {
1615                         continue;
1616                 }
1617
1618                 options.pnn = nodemap->nodes[i].pnn;
1619                 control_delip(ctdb, argc, argv);
1620         }
1621
1622         talloc_free(tmp_ctx);
1623         return 0;
1624 }
1625         
1626 /*
1627   delete a public ip address from a node
1628  */
1629 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1630 {
1631         int i, ret;
1632         ctdb_sock_addr addr;
1633         struct ctdb_control_ip_iface pub;
1634         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1635         struct ctdb_all_public_ips *ips;
1636
1637         if (argc != 1) {
1638                 talloc_free(tmp_ctx);
1639                 usage();
1640         }
1641
1642         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1643                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1644                 return -1;
1645         }
1646
1647         if (options.pnn == CTDB_BROADCAST_ALL) {
1648                 return control_delip_all(ctdb, argc, argv, &addr);
1649         }
1650
1651         pub.addr  = addr;
1652         pub.mask  = 0;
1653         pub.len   = 0;
1654
1655         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1656         if (ret != 0) {
1657                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
1658                 talloc_free(tmp_ctx);
1659                 return ret;
1660         }
1661         
1662         for (i=0;i<ips->num;i++) {
1663                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1664                         break;
1665                 }
1666         }
1667
1668         if (i==ips->num) {
1669                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
1670                         ctdb_addr_to_str(&addr)));
1671                 talloc_free(tmp_ctx);
1672                 return -1;
1673         }
1674
1675         if (ips->ips[i].pnn == options.pnn) {
1676                 ret = find_other_host_for_public_ip(ctdb, &addr);
1677                 if (ret != -1) {
1678                         if (move_ip(ctdb, &addr, ret) != 0) {
1679                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d\n", ret));
1680                                 return -1;
1681                         }
1682                 }
1683         }
1684
1685         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
1686         if (ret != 0) {
1687                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
1688                 talloc_free(tmp_ctx);
1689                 return ret;
1690         }
1691
1692         talloc_free(tmp_ctx);
1693         return 0;
1694 }
1695
1696 /*
1697   kill a tcp connection
1698  */
1699 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1700 {
1701         int ret;
1702         struct ctdb_control_killtcp killtcp;
1703
1704         if (argc < 2) {
1705                 usage();
1706         }
1707
1708         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
1709                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1710                 return -1;
1711         }
1712
1713         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
1714                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1715                 return -1;
1716         }
1717
1718         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
1719         if (ret != 0) {
1720                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
1721                 return ret;
1722         }
1723
1724         return 0;
1725 }
1726
1727
1728 /*
1729   send a gratious arp
1730  */
1731 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
1732 {
1733         int ret;
1734         ctdb_sock_addr addr;
1735
1736         if (argc < 2) {
1737                 usage();
1738         }
1739
1740         if (!parse_ip(argv[0], NULL, 0, &addr)) {
1741                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
1742                 return -1;
1743         }
1744
1745         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
1746         if (ret != 0) {
1747                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
1748                 return ret;
1749         }
1750
1751         return 0;
1752 }
1753
1754 /*
1755   register a server id
1756  */
1757 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1758 {
1759         int ret;
1760         struct ctdb_server_id server_id;
1761
1762         if (argc < 3) {
1763                 usage();
1764         }
1765
1766         server_id.pnn       = strtoul(argv[0], NULL, 0);
1767         server_id.type      = strtoul(argv[1], NULL, 0);
1768         server_id.server_id = strtoul(argv[2], NULL, 0);
1769
1770         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
1771         if (ret != 0) {
1772                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
1773                 return ret;
1774         }
1775         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
1776         sleep(999);
1777         return -1;
1778 }
1779
1780 /*
1781   unregister a server id
1782  */
1783 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1784 {
1785         int ret;
1786         struct ctdb_server_id server_id;
1787
1788         if (argc < 3) {
1789                 usage();
1790         }
1791
1792         server_id.pnn       = strtoul(argv[0], NULL, 0);
1793         server_id.type      = strtoul(argv[1], NULL, 0);
1794         server_id.server_id = strtoul(argv[2], NULL, 0);
1795
1796         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
1799                 return ret;
1800         }
1801         return -1;
1802 }
1803
1804 /*
1805   check if a server id exists
1806  */
1807 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
1808 {
1809         uint32_t status;
1810         int ret;
1811         struct ctdb_server_id server_id;
1812
1813         if (argc < 3) {
1814                 usage();
1815         }
1816
1817         server_id.pnn       = strtoul(argv[0], NULL, 0);
1818         server_id.type      = strtoul(argv[1], NULL, 0);
1819         server_id.server_id = strtoul(argv[2], NULL, 0);
1820
1821         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
1822         if (ret != 0) {
1823                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
1824                 return ret;
1825         }
1826
1827         if (status) {
1828                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
1829         } else {
1830                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
1831         }
1832         return 0;
1833 }
1834
1835 /*
1836   get a list of all server ids that are registered on a node
1837  */
1838 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
1839 {
1840         int i, ret;
1841         struct ctdb_server_id_list *server_ids;
1842
1843         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
1846                 return ret;
1847         }
1848
1849         for (i=0; i<server_ids->num; i++) {
1850                 printf("Server id %d:%d:%d\n", 
1851                         server_ids->server_ids[i].pnn, 
1852                         server_ids->server_ids[i].type, 
1853                         server_ids->server_ids[i].server_id); 
1854         }
1855
1856         return -1;
1857 }
1858
1859 /*
1860   send a tcp tickle ack
1861  */
1862 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
1863 {
1864         int ret;
1865         ctdb_sock_addr  src, dst;
1866
1867         if (argc < 2) {
1868                 usage();
1869         }
1870
1871         if (!parse_ip_port(argv[0], &src)) {
1872                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
1873                 return -1;
1874         }
1875
1876         if (!parse_ip_port(argv[1], &dst)) {
1877                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
1878                 return -1;
1879         }
1880
1881         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
1882         if (ret==0) {
1883                 return 0;
1884         }
1885         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
1886
1887         return -1;
1888 }
1889
1890
1891 /*
1892   display public ip status
1893  */
1894 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
1895 {
1896         int i, ret;
1897         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1898         struct ctdb_all_public_ips *ips;
1899
1900         if (options.pnn == CTDB_BROADCAST_ALL) {
1901                 /* read the list of public ips from all nodes */
1902                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
1903         } else {
1904                 /* read the public ip list from this node */
1905                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1906         }
1907         if (ret != 0) {
1908                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
1909                 talloc_free(tmp_ctx);
1910                 return ret;
1911         }
1912
1913         if (options.machinereadable){
1914                 printf(":Public IP:Node:");
1915                 if (options.verbose){
1916                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
1917                 }
1918                 printf("\n");
1919         } else {
1920                 if (options.pnn == CTDB_BROADCAST_ALL) {
1921                         printf("Public IPs on ALL nodes\n");
1922                 } else {
1923                         printf("Public IPs on node %u\n", options.pnn);
1924                 }
1925         }
1926
1927         for (i=1;i<=ips->num;i++) {
1928                 struct ctdb_control_public_ip_info *info = NULL;
1929                 int32_t pnn;
1930                 char *aciface = NULL;
1931                 char *avifaces = NULL;
1932                 char *cifaces = NULL;
1933
1934                 if (options.pnn == CTDB_BROADCAST_ALL) {
1935                         pnn = ips->ips[ips->num-i].pnn;
1936                 } else {
1937                         pnn = options.pnn;
1938                 }
1939
1940                 if (pnn != -1) {
1941                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
1942                                                    &ips->ips[ips->num-i].addr, &info);
1943                 } else {
1944                         ret = -1;
1945                 }
1946
1947                 if (ret == 0) {
1948                         int j;
1949                         for (j=0; j < info->num; j++) {
1950                                 if (cifaces == NULL) {
1951                                         cifaces = talloc_strdup(info,
1952                                                                 info->ifaces[j].name);
1953                                 } else {
1954                                         cifaces = talloc_asprintf_append(cifaces,
1955                                                                          ",%s",
1956                                                                          info->ifaces[j].name);
1957                                 }
1958
1959                                 if (info->active_idx == j) {
1960                                         aciface = info->ifaces[j].name;
1961                                 }
1962
1963                                 if (info->ifaces[j].link_state == 0) {
1964                                         continue;
1965                                 }
1966
1967                                 if (avifaces == NULL) {
1968                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
1969                                 } else {
1970                                         avifaces = talloc_asprintf_append(avifaces,
1971                                                                           ",%s",
1972                                                                           info->ifaces[j].name);
1973                                 }
1974                         }
1975                 }
1976
1977                 if (options.machinereadable){
1978                         printf(":%s:%d:",
1979                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1980                                 ips->ips[ips->num-i].pnn);
1981                         if (options.verbose){
1982                                 printf("%s:%s:%s:",
1983                                         aciface?aciface:"",
1984                                         avifaces?avifaces:"",
1985                                         cifaces?cifaces:"");
1986                         }
1987                         printf("\n");
1988                 } else {
1989                         if (options.verbose) {
1990                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
1991                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1992                                         ips->ips[ips->num-i].pnn,
1993                                         aciface?aciface:"",
1994                                         avifaces?avifaces:"",
1995                                         cifaces?cifaces:"");
1996                         } else {
1997                                 printf("%s %d\n",
1998                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
1999                                         ips->ips[ips->num-i].pnn);
2000                         }
2001                 }
2002                 talloc_free(info);
2003         }
2004
2005         talloc_free(tmp_ctx);
2006         return 0;
2007 }
2008
2009 /*
2010   public ip info
2011  */
2012 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2013 {
2014         int i, ret;
2015         ctdb_sock_addr addr;
2016         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2017         struct ctdb_control_public_ip_info *info;
2018
2019         if (argc != 1) {
2020                 talloc_free(tmp_ctx);
2021                 usage();
2022         }
2023
2024         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2025                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2026                 return -1;
2027         }
2028
2029         /* read the public ip info from this node */
2030         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2031                                            tmp_ctx, &addr, &info);
2032         if (ret != 0) {
2033                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2034                                   argv[0], options.pnn));
2035                 talloc_free(tmp_ctx);
2036                 return ret;
2037         }
2038
2039         printf("Public IP[%s] info on node %u\n",
2040                ctdb_addr_to_str(&info->ip.addr),
2041                options.pnn);
2042
2043         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2044                ctdb_addr_to_str(&info->ip.addr),
2045                info->ip.pnn, info->num);
2046
2047         for (i=0; i<info->num; i++) {
2048                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2049
2050                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2051                        i+1, info->ifaces[i].name,
2052                        info->ifaces[i].link_state?"up":"down",
2053                        (unsigned int)info->ifaces[i].references,
2054                        (i==info->active_idx)?" (active)":"");
2055         }
2056
2057         talloc_free(tmp_ctx);
2058         return 0;
2059 }
2060
2061 /*
2062   display interfaces status
2063  */
2064 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2065 {
2066         int i, ret;
2067         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2068         struct ctdb_control_get_ifaces *ifaces;
2069
2070         /* read the public ip list from this node */
2071         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
2072                                    tmp_ctx, &ifaces);
2073         if (ret != 0) {
2074                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2075                                   options.pnn));
2076                 talloc_free(tmp_ctx);
2077                 return ret;
2078         }
2079
2080         if (options.machinereadable){
2081                 printf(":Name:LinkStatus:References:\n");
2082         } else {
2083                 printf("Interfaces on node %u\n", options.pnn);
2084         }
2085
2086         for (i=0; i<ifaces->num; i++) {
2087                 if (options.machinereadable){
2088                         printf(":%s:%s:%u\n",
2089                                ifaces->ifaces[i].name,
2090                                ifaces->ifaces[i].link_state?"1":"0",
2091                                (unsigned int)ifaces->ifaces[i].references);
2092                 } else {
2093                         printf("name:%s link:%s references:%u\n",
2094                                ifaces->ifaces[i].name,
2095                                ifaces->ifaces[i].link_state?"up":"down",
2096                                (unsigned int)ifaces->ifaces[i].references);
2097                 }
2098         }
2099
2100         talloc_free(tmp_ctx);
2101         return 0;
2102 }
2103
2104
2105 /*
2106   set link status of an interface
2107  */
2108 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2109 {
2110         int ret;
2111         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2112         struct ctdb_control_iface_info info;
2113
2114         ZERO_STRUCT(info);
2115
2116         if (argc != 2) {
2117                 usage();
2118         }
2119
2120         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2121                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2122                                   argv[0]));
2123                 talloc_free(tmp_ctx);
2124                 return -1;
2125         }
2126         strcpy(info.name, argv[0]);
2127
2128         if (strcmp(argv[1], "up") == 0) {
2129                 info.link_state = 1;
2130         } else if (strcmp(argv[1], "down") == 0) {
2131                 info.link_state = 0;
2132         } else {
2133                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2134                                   argv[1]));
2135                 talloc_free(tmp_ctx);
2136                 return -1;
2137         }
2138
2139         /* read the public ip list from this node */
2140         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2141                                    tmp_ctx, &info);
2142         if (ret != 0) {
2143                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2144                                   argv[0], options.pnn));
2145                 talloc_free(tmp_ctx);
2146                 return ret;
2147         }
2148
2149         talloc_free(tmp_ctx);
2150         return 0;
2151 }
2152
2153 /*
2154   display pid of a ctdb daemon
2155  */
2156 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2157 {
2158         uint32_t pid;
2159         int ret;
2160
2161         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2162         if (ret != 0) {
2163                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2164                 return ret;
2165         }
2166         printf("Pid:%d\n", pid);
2167
2168         return 0;
2169 }
2170
2171 static uint32_t ipreallocate_finished;
2172
2173 /*
2174   handler for receiving the response to ipreallocate
2175 */
2176 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2177                              TDB_DATA data, void *private_data)
2178 {
2179         ipreallocate_finished = 1;
2180 }
2181
2182 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2183 {
2184         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2185
2186         event_add_timed(ctdb->ev, ctdb, 
2187                                 timeval_current_ofs(1, 0),
2188                                 ctdb_every_second, ctdb);
2189 }
2190
2191 /*
2192   ask the recovery daemon on the recovery master to perform a ip reallocation
2193  */
2194 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2195 {
2196         int i, ret;
2197         TDB_DATA data;
2198         struct takeover_run_reply rd;
2199         uint32_t recmaster;
2200         struct ctdb_node_map *nodemap=NULL;
2201         int retries=0;
2202         struct timeval tv = timeval_current();
2203
2204         /* we need some events to trigger so we can timeout and restart
2205            the loop
2206         */
2207         event_add_timed(ctdb->ev, ctdb, 
2208                                 timeval_current_ofs(1, 0),
2209                                 ctdb_every_second, ctdb);
2210
2211         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
2212         if (rd.pnn == -1) {
2213                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
2214                 return -1;
2215         }
2216         rd.srvid = getpid();
2217
2218         /* register a message port for receiveing the reply so that we
2219            can receive the reply
2220         */
2221         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
2222
2223         data.dptr = (uint8_t *)&rd;
2224         data.dsize = sizeof(rd);
2225
2226 again:
2227         /* check that there are valid nodes available */
2228         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
2229                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2230                 return -1;
2231         }
2232         for (i=0; i<nodemap->num;i++) {
2233                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
2234                         break;
2235                 }
2236         }
2237         if (i==nodemap->num) {
2238                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
2239                 return 0;
2240         }
2241
2242
2243         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2246                 return ret;
2247         }
2248
2249         /* verify the node exists */
2250         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
2251                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2252                 return -1;
2253         }
2254
2255
2256         /* check tha there are nodes available that can act as a recmaster */
2257         for (i=0; i<nodemap->num; i++) {
2258                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2259                         continue;
2260                 }
2261                 break;
2262         }
2263         if (i == nodemap->num) {
2264                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
2265                 return 0;
2266         }
2267
2268         /* verify the recovery master is not STOPPED, nor BANNED */
2269         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2270                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2271                 retries++;
2272                 sleep(1);
2273                 goto again;
2274         } 
2275
2276         
2277         /* verify the recovery master is not STOPPED, nor BANNED */
2278         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2279                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2280                 retries++;
2281                 sleep(1);
2282                 goto again;
2283         } 
2284
2285         ipreallocate_finished = 0;
2286         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2289                 return -1;
2290         }
2291
2292         tv = timeval_current();
2293         /* this loop will terminate when we have received the reply */
2294         while (timeval_elapsed(&tv) < 3.0) {
2295                 event_loop_once(ctdb->ev);
2296         }
2297         if (ipreallocate_finished == 1) {
2298                 return 0;
2299         }
2300
2301         DEBUG(DEBUG_ERR,("Timed out waiting for recmaster ipreallocate. Trying again\n"));
2302         retries++;
2303         sleep(1);
2304         goto again;
2305
2306         return 0;
2307 }
2308
2309
2310 /*
2311   disable a remote node
2312  */
2313 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2314 {
2315         int ret;
2316         struct ctdb_node_map *nodemap=NULL;
2317
2318         /* check if the node is already disabled */
2319         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2320                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2321                 exit(10);
2322         }
2323         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2324                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2325                 return 0;
2326         }
2327
2328         do {
2329                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2330                 if (ret != 0) {
2331                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2332                         return ret;
2333                 }
2334
2335                 sleep(1);
2336
2337                 /* read the nodemap and verify the change took effect */
2338                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2339                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2340                         exit(10);
2341                 }
2342
2343         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2344         ret = control_ipreallocate(ctdb, argc, argv);
2345         if (ret != 0) {
2346                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2347                 return ret;
2348         }
2349
2350         return 0;
2351 }
2352
2353 /*
2354   enable a disabled remote node
2355  */
2356 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2357 {
2358         int ret;
2359
2360         struct ctdb_node_map *nodemap=NULL;
2361
2362
2363         /* check if the node is already enabled */
2364         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2365                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2366                 exit(10);
2367         }
2368         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2369                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2370                 return 0;
2371         }
2372
2373         do {
2374                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2375                 if (ret != 0) {
2376                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2377                         return ret;
2378                 }
2379
2380                 sleep(1);
2381
2382                 /* read the nodemap and verify the change took effect */
2383                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2384                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2385                         exit(10);
2386                 }
2387
2388         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2389
2390         ret = control_ipreallocate(ctdb, argc, argv);
2391         if (ret != 0) {
2392                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2393                 return ret;
2394         }
2395
2396         return 0;
2397 }
2398
2399 /*
2400   stop a remote node
2401  */
2402 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2403 {
2404         int ret;
2405         struct ctdb_node_map *nodemap=NULL;
2406
2407         do {
2408                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2409                 if (ret != 0) {
2410                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2411                 }
2412         
2413                 sleep(1);
2414
2415                 /* read the nodemap and verify the change took effect */
2416                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2417                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2418                         exit(10);
2419                 }
2420
2421         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2422         ret = control_ipreallocate(ctdb, argc, argv);
2423         if (ret != 0) {
2424                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2425                 return ret;
2426         }
2427
2428         return 0;
2429 }
2430
2431 /*
2432   restart a stopped remote node
2433  */
2434 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2435 {
2436         int ret;
2437
2438         struct ctdb_node_map *nodemap=NULL;
2439
2440         do {
2441                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2442                 if (ret != 0) {
2443                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2444                         return ret;
2445                 }
2446         
2447                 sleep(1);
2448
2449                 /* read the nodemap and verify the change took effect */
2450                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2451                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2452                         exit(10);
2453                 }
2454
2455         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2456         ret = control_ipreallocate(ctdb, argc, argv);
2457         if (ret != 0) {
2458                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2459                 return ret;
2460         }
2461
2462         return 0;
2463 }
2464
2465 static uint32_t get_generation(struct ctdb_context *ctdb)
2466 {
2467         struct ctdb_vnn_map *vnnmap=NULL;
2468         int ret;
2469
2470         /* wait until the recmaster is not in recovery mode */
2471         while (1) {
2472                 uint32_t recmode, recmaster;
2473                 
2474                 if (vnnmap != NULL) {
2475                         talloc_free(vnnmap);
2476                         vnnmap = NULL;
2477                 }
2478
2479                 /* get the recmaster */
2480                 ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
2481                 if (ret != 0) {
2482                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2483                         exit(10);
2484                 }
2485
2486                 /* get recovery mode */
2487                 ret = ctdb_ctrl_getrecmode(ctdb, ctdb, TIMELIMIT(), recmaster, &recmode);
2488                 if (ret != 0) {
2489                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2490                         exit(10);
2491                 }
2492
2493                 /* get the current generation number */
2494                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2495                 if (ret != 0) {
2496                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2497                         exit(10);
2498                 }
2499
2500                 if ((recmode == CTDB_RECOVERY_NORMAL)
2501                 &&  (vnnmap->generation != 1)){
2502                         return vnnmap->generation;
2503                 }
2504                 sleep(1);
2505         }
2506 }
2507
2508 /*
2509   ban a node from the cluster
2510  */
2511 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2512 {
2513         int ret;
2514         struct ctdb_node_map *nodemap=NULL;
2515         struct ctdb_ban_time bantime;
2516
2517         if (argc < 1) {
2518                 usage();
2519         }
2520         
2521         /* verify the node exists */
2522         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2523         if (ret != 0) {
2524                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2525                 return ret;
2526         }
2527
2528         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2529                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2530                 return -1;
2531         }
2532
2533         bantime.pnn  = options.pnn;
2534         bantime.time = strtoul(argv[0], NULL, 0);
2535
2536         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2537         if (ret != 0) {
2538                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2539                 return -1;
2540         }       
2541
2542         ret = control_ipreallocate(ctdb, argc, argv);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2545                 return ret;
2546         }
2547
2548         return 0;
2549 }
2550
2551
2552 /*
2553   unban a node from the cluster
2554  */
2555 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2556 {
2557         int ret;
2558         struct ctdb_node_map *nodemap=NULL;
2559         struct ctdb_ban_time bantime;
2560
2561         /* verify the node exists */
2562         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2563         if (ret != 0) {
2564                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2565                 return ret;
2566         }
2567
2568         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2569                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2570                 return -1;
2571         }
2572
2573         bantime.pnn  = options.pnn;
2574         bantime.time = 0;
2575
2576         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2577         if (ret != 0) {
2578                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2579                 return -1;
2580         }       
2581
2582         ret = control_ipreallocate(ctdb, argc, argv);
2583         if (ret != 0) {
2584                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2585                 return ret;
2586         }
2587
2588         return 0;
2589 }
2590
2591
2592 /*
2593   show ban information for a node
2594  */
2595 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2596 {
2597         int ret;
2598         struct ctdb_node_map *nodemap=NULL;
2599         struct ctdb_ban_time *bantime;
2600
2601         /* verify the node exists */
2602         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2603         if (ret != 0) {
2604                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2605                 return ret;
2606         }
2607
2608         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2609         if (ret != 0) {
2610                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2611                 return -1;
2612         }       
2613
2614         if (bantime->time == 0) {
2615                 printf("Node %u is not banned\n", bantime->pnn);
2616         } else {
2617                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2618         }
2619
2620         return 0;
2621 }
2622
2623 /*
2624   shutdown a daemon
2625  */
2626 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2627 {
2628         int ret;
2629
2630         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2631         if (ret != 0) {
2632                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2633                 return ret;
2634         }
2635
2636         return 0;
2637 }
2638
2639 /*
2640   trigger a recovery
2641  */
2642 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2643 {
2644         int ret;
2645         uint32_t generation, next_generation;
2646
2647         /* record the current generation number */
2648         generation = get_generation(ctdb);
2649
2650         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2651         if (ret != 0) {
2652                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2653                 return ret;
2654         }
2655
2656         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2657         if (ret != 0) {
2658                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2659                 return ret;
2660         }
2661
2662         /* wait until we are in a new generation */
2663         while (1) {
2664                 next_generation = get_generation(ctdb);
2665                 if (next_generation != generation) {
2666                         return 0;
2667                 }
2668                 sleep(1);
2669         }
2670
2671         return 0;
2672 }
2673
2674
2675 /*
2676   display monitoring mode of a remote node
2677  */
2678 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2679 {
2680         uint32_t monmode;
2681         int ret;
2682
2683         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2684         if (ret != 0) {
2685                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2686                 return ret;
2687         }
2688         if (!options.machinereadable){
2689                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2690         } else {
2691                 printf(":mode:\n");
2692                 printf(":%d:\n",monmode);
2693         }
2694         return 0;
2695 }
2696
2697
2698 /*
2699   display capabilities of a remote node
2700  */
2701 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2702 {
2703         uint32_t capabilities;
2704         int ret;
2705
2706         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2707         if (ret != 0) {
2708                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2709                 return ret;
2710         }
2711         
2712         if (!options.machinereadable){
2713                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2714                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2715                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2716                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2717         } else {
2718                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2719                 printf(":%d:%d:%d:%d:\n",
2720                         !!(capabilities&CTDB_CAP_RECMASTER),
2721                         !!(capabilities&CTDB_CAP_LMASTER),
2722                         !!(capabilities&CTDB_CAP_LVS),
2723                         !!(capabilities&CTDB_CAP_NATGW));
2724         }
2725         return 0;
2726 }
2727
2728 /*
2729   display lvs configuration
2730  */
2731 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2732 {
2733         uint32_t *capabilities;
2734         struct ctdb_node_map *nodemap=NULL;
2735         int i, ret;
2736         int healthy_count = 0;
2737
2738         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2739         if (ret != 0) {
2740                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2741                 return ret;
2742         }
2743
2744         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2745         CTDB_NO_MEMORY(ctdb, capabilities);
2746         
2747         /* collect capabilities for all connected nodes */
2748         for (i=0; i<nodemap->num; i++) {
2749                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2750                         continue;
2751                 }
2752                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2753                         continue;
2754                 }
2755         
2756                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2757                 if (ret != 0) {
2758                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2759                         return ret;
2760                 }
2761
2762                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2763                         continue;
2764                 }
2765
2766                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2767                         healthy_count++;
2768                 }
2769         }
2770
2771         /* Print all LVS nodes */
2772         for (i=0; i<nodemap->num; i++) {
2773                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2774                         continue;
2775                 }
2776                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2777                         continue;
2778                 }
2779                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2780                         continue;
2781                 }
2782
2783                 if (healthy_count != 0) {
2784                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2785                                 continue;
2786                         }
2787                 }
2788
2789                 printf("%d:%s\n", i, 
2790                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
2791         }
2792
2793         return 0;
2794 }
2795
2796 /*
2797   display who is the lvs master
2798  */
2799 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
2800 {
2801         uint32_t *capabilities;
2802         struct ctdb_node_map *nodemap=NULL;
2803         int i, ret;
2804         int healthy_count = 0;
2805
2806         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2807         if (ret != 0) {
2808                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2809                 return ret;
2810         }
2811
2812         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2813         CTDB_NO_MEMORY(ctdb, capabilities);
2814         
2815         /* collect capabilities for all connected nodes */
2816         for (i=0; i<nodemap->num; i++) {
2817                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2818                         continue;
2819                 }
2820                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2821                         continue;
2822                 }
2823         
2824                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
2825                 if (ret != 0) {
2826                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
2827                         return ret;
2828                 }
2829
2830                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2831                         continue;
2832                 }
2833
2834                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
2835                         healthy_count++;
2836                 }
2837         }
2838
2839         /* find and show the lvsmaster */
2840         for (i=0; i<nodemap->num; i++) {
2841                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2842                         continue;
2843                 }
2844                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2845                         continue;
2846                 }
2847                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
2848                         continue;
2849                 }
2850
2851                 if (healthy_count != 0) {
2852                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
2853                                 continue;
2854                         }
2855                 }
2856
2857                 if (options.machinereadable){
2858                         printf("%d\n", i);
2859                 } else {
2860                         printf("Node %d is LVS master\n", i);
2861                 }
2862                 return 0;
2863         }
2864
2865         printf("There is no LVS master\n");
2866         return -1;
2867 }
2868
2869 /*
2870   disable monitoring on a  node
2871  */
2872 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2873 {
2874         
2875         int ret;
2876
2877         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
2878         if (ret != 0) {
2879                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
2880                 return ret;
2881         }
2882         printf("Monitoring mode:%s\n","DISABLED");
2883
2884         return 0;
2885 }
2886
2887 /*
2888   enable monitoring on a  node
2889  */
2890 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
2891 {
2892         
2893         int ret;
2894
2895         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
2896         if (ret != 0) {
2897                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
2898                 return ret;
2899         }
2900         printf("Monitoring mode:%s\n","ACTIVE");
2901
2902         return 0;
2903 }
2904
2905 /*
2906   display remote list of keys/data for a db
2907  */
2908 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
2909 {
2910         const char *db_name;
2911         struct ctdb_db_context *ctdb_db;
2912         int ret;
2913         bool persistent;
2914
2915         if (argc < 1) {
2916                 usage();
2917         }
2918
2919         db_name = argv[0];
2920
2921
2922         if (db_exists(ctdb, db_name, &persistent)) {
2923                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2924                 return -1;
2925         }
2926
2927         ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
2928
2929         if (ctdb_db == NULL) {
2930                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2931                 return -1;
2932         }
2933
2934         /* traverse and dump the cluster tdb */
2935         ret = ctdb_dump_db(ctdb_db, stdout);
2936         if (ret == -1) {
2937                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
2938                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
2939                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
2940                                   db_name));
2941                 return -1;
2942         }
2943         talloc_free(ctdb_db);
2944
2945         printf("Dumped %d records\n", ret);
2946         return 0;
2947 }
2948
2949
2950 /*
2951   fetch a record from a persistent database
2952  */
2953 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
2954 {
2955         const char *db_name;
2956         struct ctdb_db_context *ctdb_db;
2957         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2958         struct ctdb_transaction_handle *h;
2959         TDB_DATA key, data;
2960         int fd, ret;
2961         bool persistent;
2962
2963         if (argc < 2) {
2964                 talloc_free(tmp_ctx);
2965                 usage();
2966         }
2967
2968         db_name = argv[0];
2969
2970
2971         if (db_exists(ctdb, db_name, &persistent)) {
2972                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
2973                 talloc_free(tmp_ctx);
2974                 return -1;
2975         }
2976
2977         if (!persistent) {
2978                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
2979                 talloc_free(tmp_ctx);
2980                 return -1;
2981         }
2982
2983         ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
2984
2985         if (ctdb_db == NULL) {
2986                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
2987                 talloc_free(tmp_ctx);
2988                 return -1;
2989         }
2990
2991         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
2992         if (h == NULL) {
2993                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
2994                 talloc_free(tmp_ctx);
2995                 return -1;
2996         }
2997
2998         key.dptr  = discard_const(argv[1]);
2999         key.dsize = strlen(argv[1]);
3000         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3001         if (ret != 0) {
3002                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3003                 talloc_free(tmp_ctx);
3004                 return -1;
3005         }
3006
3007         if (data.dsize == 0 || data.dptr == NULL) {
3008                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3009                 talloc_free(tmp_ctx);
3010                 return -1;
3011         }
3012
3013         if (argc == 3) {
3014           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3015                 if (fd == -1) {
3016                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3017                         talloc_free(tmp_ctx);
3018                         return -1;
3019                 }
3020                 write(fd, data.dptr, data.dsize);
3021                 close(fd);
3022         } else {
3023                 write(1, data.dptr, data.dsize);
3024         }
3025
3026         /* abort the transaction */
3027         talloc_free(h);
3028
3029
3030         talloc_free(tmp_ctx);
3031         return 0;
3032 }
3033
3034 /*
3035   fetch a record from a tdb-file
3036  */
3037 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3038 {
3039         const char *tdb_file;
3040         TDB_CONTEXT *tdb;
3041         TDB_DATA key, data;
3042         int fd;
3043
3044         if (argc < 2) {
3045                 usage();
3046         }
3047
3048         tdb_file = argv[0];
3049
3050         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3051         if (tdb == NULL) {
3052                 DEBUG(DEBUG_ERR,("Failed to open TDB file %s\n", tdb_file));
3053                 return -1;
3054         }
3055
3056         key.dptr  = discard_const(argv[1]);
3057         key.dsize = strlen(argv[1]);
3058         data = tdb_fetch(tdb, key);
3059         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3060                 DEBUG(DEBUG_ERR,("Failed to read record %s from tdb %s\n", argv[1], tdb_file));
3061                 tdb_close(tdb);
3062                 return -1;
3063         }
3064
3065         tdb_close(tdb);
3066
3067         if (argc == 3) {
3068           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3069                 if (fd == -1) {
3070                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3071                         return -1;
3072                 }
3073                 write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3074                 close(fd);
3075         } else {
3076                 write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3077         }
3078
3079         return 0;
3080 }
3081
3082 /*
3083   write a record to a persistent database
3084  */
3085 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3086 {
3087         const char *db_name;
3088         struct ctdb_db_context *ctdb_db;
3089         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3090         struct ctdb_transaction_handle *h;
3091         struct stat st;
3092         TDB_DATA key, data;
3093         int fd, ret;
3094
3095         if (argc < 3) {
3096                 talloc_free(tmp_ctx);
3097                 usage();
3098         }
3099
3100         fd = open(argv[2], O_RDONLY);
3101         if (fd == -1) {
3102                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3103                 talloc_free(tmp_ctx);
3104                 return -1;
3105         }
3106         
3107         ret = fstat(fd, &st);
3108         if (ret == -1) {
3109                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3110                 close(fd);
3111                 talloc_free(tmp_ctx);
3112                 return -1;
3113         }
3114
3115         if (!S_ISREG(st.st_mode)) {
3116                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3117                 close(fd);
3118                 talloc_free(tmp_ctx);
3119                 return -1;
3120         }
3121
3122         data.dsize = st.st_size;
3123         if (data.dsize == 0) {
3124                 data.dptr  = NULL;
3125         } else {
3126                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3127                 if (data.dptr == NULL) {
3128                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3129                         close(fd);
3130                         talloc_free(tmp_ctx);
3131                         return -1;
3132                 }
3133                 ret = read(fd, data.dptr, data.dsize);
3134                 if (ret != data.dsize) {
3135                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3136                         close(fd);
3137                         talloc_free(tmp_ctx);
3138                         return -1;
3139                 }
3140         }
3141         close(fd);
3142
3143
3144         db_name = argv[0];
3145
3146         ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
3147
3148         if (ctdb_db == NULL) {
3149                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3150                 talloc_free(tmp_ctx);
3151                 return -1;
3152         }
3153
3154         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3155         if (h == NULL) {
3156                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3157                 talloc_free(tmp_ctx);
3158                 return -1;
3159         }
3160
3161         key.dptr  = discard_const(argv[1]);
3162         key.dsize = strlen(argv[1]);
3163         ret = ctdb_transaction_store(h, key, data);
3164         if (ret != 0) {
3165                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3166                 talloc_free(tmp_ctx);
3167                 return -1;
3168         }
3169
3170         ret = ctdb_transaction_commit(h);
3171         if (ret != 0) {
3172                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3173                 talloc_free(tmp_ctx);
3174                 return -1;
3175         }
3176
3177
3178         talloc_free(tmp_ctx);
3179         return 0;
3180 }
3181
3182 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3183                              TDB_DATA data, void *private_data)
3184 {
3185         DEBUG(DEBUG_ERR,("Log data received\n"));
3186         if (data.dsize > 0) {
3187                 printf("%s", data.dptr);
3188         }
3189
3190         exit(0);
3191 }
3192
3193 /*
3194   display a list of log messages from the in memory ringbuffer
3195  */
3196 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3197 {
3198         int ret;
3199         int32_t res;
3200         struct ctdb_get_log_addr log_addr;
3201         TDB_DATA data;
3202         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3203         char *errmsg;
3204         struct timeval tv;
3205
3206         if (argc != 1) {
3207                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3208                 talloc_free(tmp_ctx);
3209                 return -1;
3210         }
3211
3212         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3213         log_addr.srvid = getpid();
3214         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3215                 log_addr.level = get_debug_by_desc(argv[0]);
3216         } else {
3217                 log_addr.level = strtol(argv[0], NULL, 0);
3218         }
3219
3220
3221         data.dptr = (unsigned char *)&log_addr;
3222         data.dsize = sizeof(log_addr);
3223
3224         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3225
3226         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3227         sleep(1);
3228
3229         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3230
3231         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
3232                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3233         if (ret != 0 || res != 0) {
3234                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
3235                 talloc_free(tmp_ctx);
3236                 return -1;
3237         }
3238
3239
3240         tv = timeval_current();
3241         /* this loop will terminate when we have received the reply */
3242         while (timeval_elapsed(&tv) < 3.0) {    
3243                 event_loop_once(ctdb->ev);
3244         }
3245
3246         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
3247
3248         talloc_free(tmp_ctx);
3249         return 0;
3250 }
3251
3252 /*
3253   clear the in memory log area
3254  */
3255 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
3256 {
3257         int ret;
3258         int32_t res;
3259         char *errmsg;
3260         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3261
3262         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
3263                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
3264         if (ret != 0 || res != 0) {
3265                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
3266                 talloc_free(tmp_ctx);
3267                 return -1;
3268         }
3269
3270         talloc_free(tmp_ctx);
3271         return 0;
3272 }
3273
3274
3275
3276 /*
3277   display a list of the databases on a remote ctdb
3278  */
3279 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
3280 {
3281         int i, ret;
3282         struct ctdb_dbid_map *dbmap=NULL;
3283
3284         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3285         if (ret != 0) {
3286                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3287                 return ret;
3288         }
3289
3290         if(options.machinereadable){
3291                 printf(":ID:Name:Path:Persistent:Unhealthy:\n");
3292                 for(i=0;i<dbmap->num;i++){
3293                         const char *path;
3294                         const char *name;
3295                         const char *health;
3296                         bool persistent;
3297
3298                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
3299                                             dbmap->dbs[i].dbid, ctdb, &path);
3300                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3301                                             dbmap->dbs[i].dbid, ctdb, &name);
3302                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3303                                               dbmap->dbs[i].dbid, ctdb, &health);
3304                         persistent = dbmap->dbs[i].persistent;
3305                         printf(":0x%08X:%s:%s:%d:%d:\n",
3306                                dbmap->dbs[i].dbid, name, path,
3307                                !!(persistent), !!(health));
3308                 }
3309                 return 0;
3310         }
3311
3312         printf("Number of databases:%d\n", dbmap->num);
3313         for(i=0;i<dbmap->num;i++){
3314                 const char *path;
3315                 const char *name;
3316                 const char *health;
3317                 bool persistent;
3318
3319                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3320                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3321                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3322                 persistent = dbmap->dbs[i].persistent;
3323                 printf("dbid:0x%08x name:%s path:%s%s%s\n",
3324                        dbmap->dbs[i].dbid, name, path,
3325                        persistent?" PERSISTENT":"",
3326                        health?" UNHEALTHY":"");
3327         }
3328
3329         return 0;
3330 }
3331
3332 /*
3333   display the status of a database on a remote ctdb
3334  */
3335 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
3336 {
3337         int i, ret;
3338         struct ctdb_dbid_map *dbmap=NULL;
3339         const char *db_name;
3340
3341         if (argc < 1) {
3342                 usage();
3343         }
3344
3345         db_name = argv[0];
3346
3347         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3348         if (ret != 0) {
3349                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3350                 return ret;
3351         }
3352
3353         for(i=0;i<dbmap->num;i++){
3354                 const char *path;
3355                 const char *name;
3356                 const char *health;
3357                 bool persistent;
3358
3359                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3360                 if (strcmp(name, db_name) != 0) {
3361                         continue;
3362                 }
3363
3364                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3365                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3366                 persistent = dbmap->dbs[i].persistent;
3367                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
3368                        dbmap->dbs[i].dbid, name, path,
3369                        persistent?"yes":"no",
3370                        health?health:"OK");
3371                 return 0;
3372         }
3373
3374         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
3375         return 0;
3376 }
3377
3378 /*
3379   check if the local node is recmaster or not
3380   it will return 1 if this node is the recmaster and 0 if it is not
3381   or if the local ctdb daemon could not be contacted
3382  */
3383 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3384 {
3385         uint32_t mypnn, recmaster;
3386         int ret;
3387
3388         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
3389         if (mypnn == -1) {
3390                 printf("Failed to get pnn of node\n");
3391                 return 1;
3392         }
3393
3394         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
3395         if (ret != 0) {
3396                 printf("Failed to get the recmaster\n");
3397                 return 1;
3398         }
3399
3400         if (recmaster != mypnn) {
3401                 printf("this node is not the recmaster\n");
3402                 return 1;
3403         }
3404
3405         printf("this node is the recmaster\n");
3406         return 0;
3407 }
3408
3409 /*
3410   ping a node
3411  */
3412 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
3413 {
3414         int ret;
3415         struct timeval tv = timeval_current();
3416         ret = ctdb_ctrl_ping(ctdb, options.pnn);
3417         if (ret == -1) {
3418                 printf("Unable to get ping response from node %u\n", options.pnn);
3419                 return -1;
3420         } else {
3421                 printf("response from %u time=%.6f sec  (%d clients)\n", 
3422                        options.pnn, timeval_elapsed(&tv), ret);
3423         }
3424         return 0;
3425 }
3426
3427
3428 /*
3429   get a tunable
3430  */
3431 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3432 {
3433         const char *name;
3434         uint32_t value;
3435         int ret;
3436
3437         if (argc < 1) {
3438                 usage();
3439         }
3440
3441         name = argv[0];
3442         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
3443         if (ret == -1) {
3444                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
3445                 return -1;
3446         }
3447
3448         printf("%-19s = %u\n", name, value);
3449         return 0;
3450 }
3451
3452 /*
3453   set a tunable
3454  */
3455 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
3456 {
3457         const char *name;
3458         uint32_t value;
3459         int ret;
3460
3461         if (argc < 2) {
3462                 usage();
3463         }
3464
3465         name = argv[0];
3466         value = strtoul(argv[1], NULL, 0);
3467
3468         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
3469         if (ret == -1) {
3470                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
3471                 return -1;
3472         }
3473         return 0;
3474 }
3475
3476 /*
3477   list all tunables
3478  */
3479 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
3480 {
3481         uint32_t count;
3482         const char **list;
3483         int ret, i;
3484
3485         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
3486         if (ret == -1) {
3487                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
3488                 return -1;
3489         }
3490
3491         for (i=0;i<count;i++) {
3492                 control_getvar(ctdb, 1, &list[i]);
3493         }
3494
3495         talloc_free(list);
3496         
3497         return 0;
3498 }
3499
3500 /*
3501   display debug level on a node
3502  */
3503 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3504 {
3505         int ret;
3506         int32_t level;
3507
3508         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
3509         if (ret != 0) {
3510                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
3511                 return ret;
3512         } else {
3513                 if (options.machinereadable){
3514                         printf(":Name:Level:\n");
3515                         printf(":%s:%d:\n",get_debug_by_level(level),level);
3516                 } else {
3517                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
3518                 }
3519         }
3520         return 0;
3521 }
3522
3523 /*
3524   display reclock file of a node
3525  */
3526 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3527 {
3528         int ret;
3529         const char *reclock;
3530
3531         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
3532         if (ret != 0) {
3533                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3534                 return ret;
3535         } else {
3536                 if (options.machinereadable){
3537                         if (reclock != NULL) {
3538                                 printf("%s", reclock);
3539                         }
3540                 } else {
3541                         if (reclock == NULL) {
3542                                 printf("No reclock file used.\n");
3543                         } else {
3544                                 printf("Reclock file:%s\n", reclock);
3545                         }
3546                 }
3547         }
3548         return 0;
3549 }
3550
3551 /*
3552   set the reclock file of a node
3553  */
3554 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
3555 {
3556         int ret;
3557         const char *reclock;
3558
3559         if (argc == 0) {
3560                 reclock = NULL;
3561         } else if (argc == 1) {
3562                 reclock = argv[0];
3563         } else {
3564                 usage();
3565         }
3566
3567         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
3568         if (ret != 0) {
3569                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
3570                 return ret;
3571         }
3572         return 0;
3573 }
3574
3575 /*
3576   set the natgw state on/off
3577  */
3578 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
3579 {
3580         int ret;
3581         uint32_t natgwstate;
3582
3583         if (argc == 0) {
3584                 usage();
3585         }
3586
3587         if (!strcmp(argv[0], "on")) {
3588                 natgwstate = 1;
3589         } else if (!strcmp(argv[0], "off")) {
3590                 natgwstate = 0;
3591         } else {
3592                 usage();
3593         }
3594
3595         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
3596         if (ret != 0) {
3597                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
3598                 return ret;
3599         }
3600
3601         return 0;
3602 }
3603
3604 /*
3605   set the lmaster role on/off
3606  */
3607 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3608 {
3609         int ret;
3610         uint32_t lmasterrole;
3611
3612         if (argc == 0) {
3613                 usage();
3614         }
3615
3616         if (!strcmp(argv[0], "on")) {
3617                 lmasterrole = 1;
3618         } else if (!strcmp(argv[0], "off")) {
3619                 lmasterrole = 0;
3620         } else {
3621                 usage();
3622         }
3623
3624         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
3625         if (ret != 0) {
3626                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
3627                 return ret;
3628         }
3629
3630         return 0;
3631 }
3632
3633 /*
3634   set the recmaster role on/off
3635  */
3636 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
3637 {
3638         int ret;
3639         uint32_t recmasterrole;
3640
3641         if (argc == 0) {
3642                 usage();
3643         }
3644
3645         if (!strcmp(argv[0], "on")) {
3646                 recmasterrole = 1;
3647         } else if (!strcmp(argv[0], "off")) {
3648                 recmasterrole = 0;
3649         } else {
3650                 usage();
3651         }
3652
3653         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
3654         if (ret != 0) {
3655                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
3656                 return ret;
3657         }
3658
3659         return 0;
3660 }
3661
3662 /*
3663   set debug level on a node or all nodes
3664  */
3665 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
3666 {
3667         int i, ret;
3668         int32_t level;
3669
3670         if (argc == 0) {
3671                 printf("You must specify the debug level. Valid levels are:\n");
3672                 for (i=0; debug_levels[i].description != NULL; i++) {
3673                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3674                 }
3675
3676                 return 0;
3677         }
3678
3679         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3680                 level = get_debug_by_desc(argv[0]);
3681         } else {
3682                 level = strtol(argv[0], NULL, 0);
3683         }
3684
3685         for (i=0; debug_levels[i].description != NULL; i++) {
3686                 if (level == debug_levels[i].level) {
3687                         break;
3688                 }
3689         }
3690         if (debug_levels[i].description == NULL) {
3691                 printf("Invalid debug level, must be one of\n");
3692                 for (i=0; debug_levels[i].description != NULL; i++) {
3693                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
3694                 }
3695                 return -1;
3696         }
3697
3698         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
3699         if (ret != 0) {
3700                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
3701         }
3702         return 0;
3703 }
3704
3705
3706 /*
3707   thaw a node
3708  */
3709 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
3710 {
3711         int ret;
3712         uint32_t priority;
3713         
3714         if (argc == 1) {
3715                 priority = strtol(argv[0], NULL, 0);
3716         } else {
3717                 priority = 0;
3718         }
3719         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
3720
3721         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
3722         if (ret != 0) {
3723                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
3724         }               
3725         return 0;
3726 }
3727
3728
3729 /*
3730   attach to a database
3731  */
3732 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
3733 {
3734         const char *db_name;
3735         struct ctdb_db_context *ctdb_db;
3736         bool persistent = false;
3737
3738         if (argc < 1) {
3739                 usage();
3740         }
3741         db_name = argv[0];
3742         if (argc > 2) {
3743                 usage();
3744         }
3745         if (argc == 2) {
3746                 if (strcmp(argv[1], "persistent") != 0) {
3747                         usage();
3748                 }
3749                 persistent = true;
3750         }
3751
3752         ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
3753         if (ctdb_db == NULL) {
3754                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3755                 return -1;
3756         }
3757
3758         return 0;
3759 }
3760
3761 /*
3762   set db priority
3763  */
3764 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3765 {
3766         struct ctdb_db_priority db_prio;
3767         int ret;
3768
3769         if (argc < 2) {
3770                 usage();
3771         }
3772
3773         db_prio.db_id    = strtoul(argv[0], NULL, 0);
3774         db_prio.priority = strtoul(argv[1], NULL, 0);
3775
3776         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
3777         if (ret != 0) {
3778                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
3779                 return -1;
3780         }
3781
3782         return 0;
3783 }
3784
3785 /*
3786   get db priority
3787  */
3788 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
3789 {
3790         uint32_t db_id, priority;
3791         int ret;
3792
3793         if (argc < 1) {
3794                 usage();
3795         }
3796
3797         db_id = strtoul(argv[0], NULL, 0);
3798
3799         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
3800         if (ret != 0) {
3801                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
3802                 return -1;
3803         }
3804
3805         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
3806
3807         return 0;
3808 }
3809
3810 /*
3811   run an eventscript on a node
3812  */
3813 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
3814 {
3815         TDB_DATA data;
3816         int ret;
3817         int32_t res;
3818         char *errmsg;
3819         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3820
3821         if (argc != 1) {
3822                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3823                 return -1;
3824         }
3825
3826         data.dptr = (unsigned char *)discard_const(argv[0]);
3827         data.dsize = strlen((char *)data.dptr) + 1;
3828
3829         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
3830
3831         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
3832                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3833         if (ret != 0 || res != 0) {
3834                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
3835                 talloc_free(tmp_ctx);
3836                 return -1;
3837         }
3838         talloc_free(tmp_ctx);
3839         return 0;
3840 }
3841
3842 #define DB_VERSION 1
3843 #define MAX_DB_NAME 64
3844 struct db_file_header {
3845         unsigned long version;
3846         time_t timestamp;
3847         unsigned long persistent;
3848         unsigned long size;
3849         const char name[MAX_DB_NAME];
3850 };
3851
3852 struct backup_data {
3853         struct ctdb_marshall_buffer *records;
3854         uint32_t len;
3855         uint32_t total;
3856         bool traverse_error;
3857 };
3858
3859 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
3860 {
3861         struct backup_data *bd = talloc_get_type(private, struct backup_data);
3862         struct ctdb_rec_data *rec;
3863
3864         /* add the record */
3865         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
3866         if (rec == NULL) {
3867                 bd->traverse_error = true;
3868                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
3869                 return -1;
3870         }
3871         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
3872         if (bd->records == NULL) {
3873                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
3874                 bd->traverse_error = true;
3875                 return -1;
3876         }
3877         bd->records->count++;
3878         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
3879         bd->len += rec->length;
3880         talloc_free(rec);
3881
3882         bd->total++;
3883         return 0;
3884 }
3885
3886 /*
3887  * backup a database to a file 
3888  */
3889 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
3890 {
3891         int i, ret;
3892         struct ctdb_dbid_map *dbmap=NULL;
3893         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3894         struct db_file_header dbhdr;
3895         struct ctdb_db_context *ctdb_db;
3896         struct backup_data *bd;
3897         int fh = -1;
3898         int status = -1;
3899         const char *reason = NULL;
3900
3901         if (argc != 2) {
3902                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3903                 return -1;
3904         }
3905
3906         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
3907         if (ret != 0) {
3908                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3909                 return ret;
3910         }
3911
3912         for(i=0;i<dbmap->num;i++){
3913                 const char *name;
3914
3915                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
3916                 if(!strcmp(argv[0], name)){
3917                         talloc_free(discard_const(name));
3918                         break;
3919                 }
3920                 talloc_free(discard_const(name));
3921         }
3922         if (i == dbmap->num) {
3923                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
3924                 talloc_free(tmp_ctx);
3925                 return -1;
3926         }
3927
3928         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3929                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
3930         if (ret != 0) {
3931                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
3932                                  argv[0]));
3933                 talloc_free(tmp_ctx);
3934                 return -1;
3935         }
3936         if (reason) {
3937                 uint32_t allow_unhealthy = 0;
3938
3939                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
3940                                       "AllowUnhealthyDBRead",
3941                                       &allow_unhealthy);
3942
3943                 if (allow_unhealthy != 1) {
3944                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
3945                                          argv[0], reason));
3946
3947                         DEBUG(DEBUG_ERR,("disallow backup : tunnable AllowUnhealthyDBRead = %u\n",
3948                                          allow_unhealthy));
3949                         talloc_free(tmp_ctx);
3950                         return -1;
3951                 }
3952
3953                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
3954                                      argv[0], argv[0]));
3955                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
3956                                      "tunnable AllowUnhealthyDBRead = %u\n",
3957                                      allow_unhealthy));
3958         }
3959
3960         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
3961         if (ctdb_db == NULL) {
3962                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
3963                 talloc_free(tmp_ctx);
3964                 return -1;
3965         }
3966
3967
3968         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3969         if (ret == -1) {
3970                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
3971                 talloc_free(tmp_ctx);
3972                 return -1;
3973         }
3974
3975
3976         bd = talloc_zero(tmp_ctx, struct backup_data);
3977         if (bd == NULL) {
3978                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
3979                 talloc_free(tmp_ctx);
3980                 return -1;
3981         }
3982
3983         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
3984         if (bd->records == NULL) {
3985                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
3986                 talloc_free(tmp_ctx);
3987                 return -1;
3988         }
3989
3990         bd->len = offsetof(struct ctdb_marshall_buffer, data);
3991         bd->records->db_id = ctdb_db->db_id;
3992         /* traverse the database collecting all records */
3993         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
3994             bd->traverse_error) {
3995                 DEBUG(DEBUG_ERR,("Traverse error\n"));
3996                 talloc_free(tmp_ctx);
3997                 return -1;              
3998         }
3999
4000         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4001
4002
4003         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
4004         if (fh == -1) {
4005                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
4006                 talloc_free(tmp_ctx);
4007                 return -1;
4008         }
4009
4010         dbhdr.version = DB_VERSION;
4011         dbhdr.timestamp = time(NULL);
4012         dbhdr.persistent = dbmap->dbs[i].persistent;
4013         dbhdr.size = bd->len;
4014         if (strlen(argv[0]) >= MAX_DB_NAME) {
4015                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
4016                 goto done;
4017         }
4018         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
4019         ret = write(fh, &dbhdr, sizeof(dbhdr));
4020         if (ret == -1) {
4021                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4022                 goto done;
4023         }
4024         ret = write(fh, bd->records, bd->len);
4025         if (ret == -1) {
4026                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4027                 goto done;
4028         }
4029
4030         status = 0;
4031 done:
4032         if (fh != -1) {
4033                 ret = close(fh);
4034                 if (ret == -1) {
4035                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
4036                 }
4037         }
4038         talloc_free(tmp_ctx);
4039         return status;
4040 }
4041
4042 /*
4043  * restore a database from a file 
4044  */
4045 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
4046 {
4047         int ret;
4048         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4049         TDB_DATA outdata;
4050         TDB_DATA data;
4051         struct db_file_header dbhdr;
4052         struct ctdb_db_context *ctdb_db;
4053         struct ctdb_node_map *nodemap=NULL;
4054         struct ctdb_vnn_map *vnnmap=NULL;
4055         int i, fh;
4056         struct ctdb_control_wipe_database w;
4057         uint32_t *nodes;
4058         uint32_t generation;
4059         struct tm *tm;
4060         char tbuf[100];
4061         char *dbname;
4062
4063         if (argc < 1 || argc > 2) {
4064                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4065                 return -1;
4066         }
4067
4068         fh = open(argv[0], O_RDONLY);
4069         if (fh == -1) {
4070                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4071                 talloc_free(tmp_ctx);
4072                 return -1;
4073         }
4074
4075         read(fh, &dbhdr, sizeof(dbhdr));
4076         if (dbhdr.version != DB_VERSION) {
4077                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4078                 talloc_free(tmp_ctx);
4079                 return -1;
4080         }
4081
4082         dbname = discard_const(dbhdr.name);
4083         if (argc == 2) {
4084                 dbname = discard_const(argv[1]);
4085         }
4086
4087         outdata.dsize = dbhdr.size;
4088         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4089         if (outdata.dptr == NULL) {
4090                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4091                 close(fh);
4092                 talloc_free(tmp_ctx);
4093                 return -1;
4094         }               
4095         read(fh, outdata.dptr, outdata.dsize);
4096         close(fh);
4097
4098         tm = localtime(&dbhdr.timestamp);
4099         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4100         printf("Restoring database '%s' from backup @ %s\n",
4101                 dbname, tbuf);
4102
4103
4104         ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
4105         if (ctdb_db == NULL) {
4106                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
4107                 talloc_free(tmp_ctx);
4108                 return -1;
4109         }
4110
4111         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4112         if (ret != 0) {
4113                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4114                 talloc_free(tmp_ctx);
4115                 return ret;
4116         }
4117
4118
4119         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
4120         if (ret != 0) {
4121                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
4122                 talloc_free(tmp_ctx);
4123                 return ret;
4124         }
4125
4126         /* freeze all nodes */
4127         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4128         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4129                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4130                                         nodes, i,
4131                                         TIMELIMIT(),
4132                                         false, tdb_null,
4133                                         NULL, NULL,
4134                                         NULL) != 0) {
4135                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4136                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4137                         talloc_free(tmp_ctx);
4138                         return -1;
4139                 }
4140         }
4141
4142         generation = vnnmap->generation;
4143         data.dptr = (void *)&generation;
4144         data.dsize = sizeof(generation);
4145
4146         /* start a cluster wide transaction */
4147         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4148         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4149                                         nodes, 0,
4150                                         TIMELIMIT(), false, data,
4151                                         NULL, NULL,
4152                                         NULL) != 0) {
4153                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
4154                 return -1;
4155         }
4156
4157
4158         w.db_id = ctdb_db->db_id;
4159         w.transaction_id = generation;
4160
4161         data.dptr = (void *)&w;
4162         data.dsize = sizeof(w);
4163
4164         /* wipe all the remote databases. */
4165         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4166         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4167                                         nodes, 0,
4168                                         TIMELIMIT(), false, data,
4169                                         NULL, NULL,
4170                                         NULL) != 0) {
4171                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4172                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4173                 talloc_free(tmp_ctx);
4174                 return -1;
4175         }
4176         
4177         /* push the database */
4178         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4179         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
4180                                         nodes, 0,
4181                                         TIMELIMIT(), false, outdata,
4182                                         NULL, NULL,
4183                                         NULL) != 0) {
4184                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
4185                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4186                 talloc_free(tmp_ctx);
4187                 return -1;
4188         }
4189
4190         data.dptr = (void *)&ctdb_db->db_id;
4191         data.dsize = sizeof(ctdb_db->db_id);
4192
4193         /* mark the database as healthy */
4194         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4195         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4196                                         nodes, 0,
4197                                         TIMELIMIT(), false, data,
4198                                         NULL, NULL,
4199                                         NULL) != 0) {
4200                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4201                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4202                 talloc_free(tmp_ctx);
4203                 return -1;
4204         }
4205
4206         data.dptr = (void *)&generation;
4207         data.dsize = sizeof(generation);
4208
4209         /* commit all the changes */
4210         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4211                                         nodes, 0,
4212                                         TIMELIMIT(), false, data,
4213                                         NULL, NULL,
4214                                         NULL) != 0) {
4215                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4216                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4217                 talloc_free(tmp_ctx);
4218                 return -1;
4219         }
4220
4221
4222         /* thaw all nodes */
4223         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4224         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4225                                         nodes, 0,
4226                                         TIMELIMIT(),
4227                                         false, tdb_null,
4228                                         NULL, NULL,
4229                                         NULL) != 0) {
4230                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4231                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4232                 talloc_free(tmp_ctx);
4233                 return -1;
4234         }
4235
4236
4237         talloc_free(tmp_ctx);
4238         return 0;
4239 }
4240
4241 /*
4242  * dump a database backup from a file
4243  */
4244 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
4245 {
4246         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4247         TDB_DATA outdata;
4248         struct db_file_header dbhdr;
4249         int i, fh;
4250         struct tm *tm;
4251         char tbuf[100];
4252         struct ctdb_rec_data *rec = NULL;
4253         struct ctdb_marshall_buffer *m;
4254
4255         if (argc != 1) {
4256                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4257                 return -1;
4258         }
4259
4260         fh = open(argv[0], O_RDONLY);
4261         if (fh == -1) {
4262                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4263                 talloc_free(tmp_ctx);
4264                 return -1;
4265         }
4266
4267         read(fh, &dbhdr, sizeof(dbhdr));
4268         if (dbhdr.version != DB_VERSION) {
4269                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4270                 talloc_free(tmp_ctx);
4271                 return -1;
4272         }
4273
4274         outdata.dsize = dbhdr.size;
4275         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4276         if (outdata.dptr == NULL) {
4277                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4278                 close(fh);
4279                 talloc_free(tmp_ctx);
4280                 return -1;
4281         }
4282         read(fh, outdata.dptr, outdata.dsize);
4283         close(fh);
4284         m = (struct ctdb_marshall_buffer *)outdata.dptr;
4285
4286         tm = localtime(&dbhdr.timestamp);
4287         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4288         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
4289                 dbhdr.name, m->db_id, tbuf);
4290
4291         for (i=0; i < m->count; i++) {
4292                 uint32_t reqid = 0;
4293                 TDB_DATA key, data;
4294
4295                 /* we do not want the header splitted, so we pass NULL*/
4296                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
4297                                               NULL, &key, &data);
4298
4299                 ctdb_dumpdb_record(ctdb, key, data, stdout);
4300         }
4301
4302         printf("Dumped %d records\n", i);
4303         talloc_free(tmp_ctx);
4304         return 0;
4305 }
4306
4307 /*
4308  * wipe a database from a file
4309  */
4310 static int control_wipedb(struct ctdb_context *ctdb, int argc,
4311                           const char **argv)
4312 {
4313         int ret;
4314         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4315         TDB_DATA data;
4316         struct ctdb_db_context *ctdb_db;
4317         struct ctdb_node_map *nodemap = NULL;
4318         struct ctdb_vnn_map *vnnmap = NULL;
4319         int i;
4320         struct ctdb_control_wipe_database w;
4321         uint32_t *nodes;
4322         uint32_t generation;
4323         struct ctdb_dbid_map *dbmap = NULL;
4324
4325         if (argc != 1) {
4326                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4327                 return -1;
4328         }
4329
4330         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4331                                  &dbmap);
4332         if (ret != 0) {
4333                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
4334                                   options.pnn));
4335                 return ret;
4336         }
4337
4338         for(i=0;i<dbmap->num;i++){
4339                 const char *name;
4340
4341                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4342                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
4343                 if(!strcmp(argv[0], name)){
4344                         talloc_free(discard_const(name));
4345                         break;
4346                 }
4347                 talloc_free(discard_const(name));
4348         }
4349         if (i == dbmap->num) {
4350                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
4351                                   argv[0]));
4352                 talloc_free(tmp_ctx);
4353                 return -1;
4354         }
4355
4356         ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
4357         if (ctdb_db == NULL) {
4358                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
4359                                   argv[0]));
4360                 talloc_free(tmp_ctx);
4361                 return -1;
4362         }
4363
4364         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
4365                                    &nodemap);
4366         if (ret != 0) {
4367                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
4368                                   options.pnn));
4369                 talloc_free(tmp_ctx);
4370                 return ret;
4371         }
4372
4373         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4374                                   &vnnmap);
4375         if (ret != 0) {
4376                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
4377                                   options.pnn));
4378                 talloc_free(tmp_ctx);
4379                 return ret;
4380         }
4381
4382         /* freeze all nodes */
4383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4384         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4385                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4386                                                 nodes, i,
4387                                                 TIMELIMIT(),
4388                                                 false, tdb_null,
4389                                                 NULL, NULL,
4390                                                 NULL);
4391                 if (ret != 0) {
4392                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4393                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
4394                                              CTDB_RECOVERY_ACTIVE);
4395                         talloc_free(tmp_ctx);
4396                         return -1;
4397                 }
4398         }
4399
4400         generation = vnnmap->generation;
4401         data.dptr = (void *)&generation;
4402         data.dsize = sizeof(generation);
4403
4404         /* start a cluster wide transaction */
4405         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4406         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4407                                         nodes, 0,
4408                                         TIMELIMIT(), false, data,
4409                                         NULL, NULL,
4410                                         NULL);
4411         if (ret!= 0) {
4412                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
4413                                   "transactions.\n"));
4414                 return -1;
4415         }
4416
4417         w.db_id = ctdb_db->db_id;
4418         w.transaction_id = generation;
4419
4420         data.dptr = (void *)&w;
4421         data.dsize = sizeof(w);
4422
4423         /* wipe all the remote databases. */
4424         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4425         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4426                                         nodes, 0,
4427                                         TIMELIMIT(), false, data,
4428                                         NULL, NULL,
4429                                         NULL) != 0) {
4430                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4431                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4432                 talloc_free(tmp_ctx);
4433                 return -1;
4434         }
4435
4436         data.dptr = (void *)&ctdb_db->db_id;
4437         data.dsize = sizeof(ctdb_db->db_id);
4438
4439         /* mark the database as healthy */
4440         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4441         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4442                                         nodes, 0,
4443                                         TIMELIMIT(), false, data,
4444                                         NULL, NULL,
4445                                         NULL) != 0) {
4446                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4447                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4448                 talloc_free(tmp_ctx);
4449                 return -1;
4450         }
4451
4452         data.dptr = (void *)&generation;
4453         data.dsize = sizeof(generation);
4454
4455         /* commit all the changes */
4456         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4457                                         nodes, 0,
4458                                         TIMELIMIT(), false, data,
4459                                         NULL, NULL,
4460                                         NULL) != 0) {
4461                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4462                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4463                 talloc_free(tmp_ctx);
4464                 return -1;
4465         }
4466
4467         /* thaw all nodes */
4468         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4469         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4470                                         nodes, 0,
4471                                         TIMELIMIT(),
4472                                         false, tdb_null,
4473                                         NULL, NULL,
4474                                         NULL) != 0) {
4475                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4476                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4477                 talloc_free(tmp_ctx);
4478                 return -1;
4479         }
4480
4481         talloc_free(tmp_ctx);
4482         return 0;
4483 }
4484
4485 /*
4486   dump memory usage
4487  */
4488 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4489 {
4490         TDB_DATA data;
4491         int ret;
4492         int32_t res;
4493         char *errmsg;
4494         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4495         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
4496                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
4497         if (ret != 0 || res != 0) {
4498                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
4499                 talloc_free(tmp_ctx);
4500                 return -1;
4501         }
4502         write(1, data.dptr, data.dsize);
4503         talloc_free(tmp_ctx);
4504         return 0;
4505 }
4506
4507 /*
4508   handler for memory dumps
4509 */
4510 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4511                              TDB_DATA data, void *private_data)
4512 {
4513         write(1, data.dptr, data.dsize);
4514         exit(0);
4515 }
4516
4517 /*
4518   dump memory usage on the recovery daemon
4519  */
4520 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
4521 {
4522         int ret;
4523         TDB_DATA data;
4524         struct rd_memdump_reply rd;
4525
4526         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4527         if (rd.pnn == -1) {
4528                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4529                 return -1;
4530         }
4531         rd.srvid = getpid();
4532
4533         /* register a message port for receiveing the reply so that we
4534            can receive the reply
4535         */
4536         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
4537
4538
4539         data.dptr = (uint8_t *)&rd;
4540         data.dsize = sizeof(rd);
4541
4542         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
4543         if (ret != 0) {
4544                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4545                 return -1;
4546         }
4547
4548         /* this loop will terminate when we have received the reply */
4549         while (1) {     
4550                 event_loop_once(ctdb->ev);
4551         }
4552
4553         return 0;
4554 }
4555
4556 /*
4557   send a message to a srvid
4558  */
4559 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
4560 {
4561         unsigned long srvid;
4562         int ret;
4563         TDB_DATA data;
4564
4565         if (argc < 2) {
4566                 usage();
4567         }
4568
4569         srvid      = strtoul(argv[0], NULL, 0);
4570
4571         data.dptr = (uint8_t *)discard_const(argv[1]);
4572         data.dsize= strlen(argv[1]);
4573
4574         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
4575         if (ret != 0) {
4576                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
4577                 return -1;
4578         }
4579
4580         return 0;
4581 }
4582
4583 /*
4584   handler for msglisten
4585 */
4586 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4587                              TDB_DATA data, void *private_data)
4588 {
4589         int i;
4590
4591         printf("Message received: ");
4592         for (i=0;i<data.dsize;i++) {
4593                 printf("%c", data.dptr[i]);
4594         }
4595         printf("\n");
4596 }
4597
4598 /*
4599   listen for messages on a messageport
4600  */
4601 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
4602 {
4603         uint64_t srvid;
4604
4605         srvid = getpid();
4606
4607         /* register a message port and listen for messages
4608         */
4609         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
4610         printf("Listening for messages on srvid:%d\n", (int)srvid);
4611
4612         while (1) {     
4613                 event_loop_once(ctdb->ev);
4614         }
4615
4616         return 0;
4617 }
4618
4619 /*
4620   list all nodes in the cluster
4621   we parse the nodes file directly
4622  */
4623 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
4624 {
4625         TALLOC_CTX *mem_ctx = talloc_new(NULL);
4626         struct pnn_node *pnn_nodes;
4627         struct pnn_node *pnn_node;
4628
4629         pnn_nodes = read_nodes_file(mem_ctx);
4630         if (pnn_nodes == NULL) {
4631                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
4632                 talloc_free(mem_ctx);
4633                 return -1;
4634         }
4635
4636         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
4637                 ctdb_sock_addr addr;
4638                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
4639                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
4640                         talloc_free(mem_ctx);
4641                         return -1;
4642                 }
4643                 if (options.machinereadable){
4644                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
4645                 } else {
4646                         printf("%s\n", pnn_node->addr);
4647                 }
4648         }
4649         talloc_free(mem_ctx);
4650
4651         return 0;
4652 }
4653
4654 /*
4655   reload the nodes file on the local node
4656  */
4657 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
4658 {
4659         int i, ret;
4660         int mypnn;
4661         struct ctdb_node_map *nodemap=NULL;
4662
4663         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4664         if (mypnn == -1) {
4665                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
4666                 return -1;
4667         }
4668
4669         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
4670         if (ret != 0) {
4671                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4672                 return ret;
4673         }
4674
4675         /* reload the nodes file on all remote nodes */
4676         for (i=0;i<nodemap->num;i++) {
4677                 if (nodemap->nodes[i].pnn == mypnn) {
4678                         continue;
4679                 }
4680                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
4681                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
4682                         nodemap->nodes[i].pnn);
4683                 if (ret != 0) {
4684                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
4685                 }
4686         }
4687
4688         /* reload the nodes file on the local node */
4689         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
4690         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
4691         if (ret != 0) {
4692                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
4693         }
4694
4695         /* initiate a recovery */
4696         control_recover(ctdb, argc, argv);
4697
4698         return 0;
4699 }
4700
4701
4702 static const struct {
4703         const char *name;
4704         int (*fn)(struct ctdb_context *, int, const char **);
4705         bool auto_all;
4706         bool without_daemon; /* can be run without daemon running ? */
4707         const char *msg;
4708         const char *args;
4709 } ctdb_commands[] = {
4710 #ifdef CTDB_VERS
4711         { "version",         control_version,           true,   false,  "show version of ctdb" },
4712 #endif
4713         { "status",          control_status,            true,   false,  "show node status" },
4714         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
4715         { "ping",            control_ping,              true,   false,  "ping all nodes" },
4716         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
4717         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
4718         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
4719         { "statistics",      control_statistics,        false,  false, "show statistics" },
4720         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
4721         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
4722         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
4723         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
4724         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
4725         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
4726         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
4727         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
4728         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
4729         { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
4730         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
4731         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
4732         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
4733         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
4734         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
4735         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
4736         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
4737         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
4738         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
4739         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
4740         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
4741         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
4742         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
4743         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
4744         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
4745         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
4746         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
4747         { "stop",            control_stop,              true,   false,  "stop a node" },
4748         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
4749         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
4750         { "unban",           control_unban,             true,   false,  "unban a node" },
4751         { "showban",         control_showban,           true,   false,  "show ban information"},
4752         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
4753         { "recover",         control_recover,           true,   false,  "force recovery" },
4754         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
4755         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
4756         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
4757         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
4758         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
4759         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
4760         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
4761         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
4762         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
4763
4764         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
4765
4766         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
4767         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
4768         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
4769         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
4770         { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
4771         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
4772         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
4773         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
4774         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
4775         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
4776         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
4777         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
4778         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
4779         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
4780         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
4781         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
4782         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
4783         { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
4784         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
4785         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
4786         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
4787         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
4788         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
4789         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
4790         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
4791         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
4792         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
4793         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
4794         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
4795         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
4796         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
4797         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
4798         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
4799         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
4800         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file", "<tdb-file> <key> [<file>]" },
4801 };
4802
4803 /*
4804   show usage message
4805  */
4806 static void usage(void)
4807 {
4808         int i;
4809         printf(
4810 "Usage: ctdb [options] <control>\n" \
4811 "Options:\n" \
4812 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
4813 "   -Y                 generate machinereadable output\n"
4814 "   -v                 generate verbose output\n"
4815 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
4816         printf("Controls:\n");
4817         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4818                 printf("  %-15s %-27s  %s\n", 
4819                        ctdb_commands[i].name, 
4820                        ctdb_commands[i].args?ctdb_commands[i].args:"",
4821                        ctdb_commands[i].msg);
4822         }
4823         exit(1);
4824 }
4825
4826
4827 static void ctdb_alarm(int sig)
4828 {
4829         printf("Maximum runtime exceeded - exiting\n");
4830         _exit(ERR_TIMEOUT);
4831 }
4832
4833 /*
4834   main program
4835 */
4836 int main(int argc, const char *argv[])
4837 {
4838         struct ctdb_context *ctdb;
4839         char *nodestring = NULL;
4840         struct poptOption popt_options[] = {
4841                 POPT_AUTOHELP
4842                 POPT_CTDB_CMDLINE
4843                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
4844                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
4845                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
4846                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
4847                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
4848                 POPT_TABLEEND
4849         };
4850         int opt;
4851         const char **extra_argv;
4852         int extra_argc = 0;
4853         int ret=-1, i;
4854         poptContext pc;
4855         struct event_context *ev;
4856         const char *control;
4857
4858         setlinebuf(stdout);
4859         
4860         /* set some defaults */
4861         options.maxruntime = 0;
4862         options.timelimit = 3;
4863         options.pnn = CTDB_CURRENT_NODE;
4864
4865         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
4866
4867         while ((opt = poptGetNextOpt(pc)) != -1) {
4868                 switch (opt) {
4869                 default:
4870                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
4871                                 poptBadOption(pc, 0), poptStrerror(opt)));
4872                         exit(1);
4873                 }
4874         }
4875
4876         /* setup the remaining options for the main program to use */
4877         extra_argv = poptGetArgs(pc);
4878         if (extra_argv) {
4879                 extra_argv++;
4880                 while (extra_argv[extra_argc]) extra_argc++;
4881         }
4882
4883         if (extra_argc < 1) {
4884                 usage();
4885         }
4886
4887         if (options.maxruntime == 0) {
4888                 const char *ctdb_timeout;
4889                 ctdb_timeout = getenv("CTDB_TIMEOUT");
4890                 if (ctdb_timeout != NULL) {
4891                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
4892                 } else {
4893                         /* default timeout is 120 seconds */
4894                         options.maxruntime = 120;
4895                 }
4896         }
4897
4898         signal(SIGALRM, ctdb_alarm);
4899         alarm(options.maxruntime);
4900
4901         /* setup the node number to contact */
4902         if (nodestring != NULL) {
4903                 if (strcmp(nodestring, "all") == 0) {
4904                         options.pnn = CTDB_BROADCAST_ALL;
4905                 } else {
4906                         options.pnn = strtoul(nodestring, NULL, 0);
4907                 }
4908         }
4909
4910         control = extra_argv[0];
4911
4912         ev = event_context_init(NULL);
4913         if (!ev) {
4914                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
4915                 exit(1);
4916         }
4917         tevent_loop_allow_nesting(ev);
4918
4919         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
4920                 if (strcmp(control, ctdb_commands[i].name) == 0) {
4921                         int j;
4922
4923                         if (ctdb_commands[i].without_daemon == true) {
4924                                 close(2);
4925                         }
4926
4927                         /* initialise ctdb */
4928                         ctdb = ctdb_cmdline_client(ev);
4929
4930                         if (ctdb_commands[i].without_daemon == false) {
4931                                 const char *socket_name;
4932
4933                                 if (ctdb == NULL) {
4934                                         DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
4935                                         exit(1);
4936                                 }
4937
4938                                 /* initialize a libctdb connection as well */
4939                                 socket_name = ctdb_get_socketname(ctdb);
4940                                 ctdb_connection = ctdb_connect(socket_name,
4941                                                        ctdb_log_file, stderr);
4942                                 if (ctdb_connection == NULL) {
4943                                         fprintf(stderr, "Failed to connect to daemon from libctdb\n");
4944                                         exit(1);
4945                                 }                               
4946                         
4947                                 /* verify the node exists */
4948                                 verify_node(ctdb);
4949
4950                                 if (options.pnn == CTDB_CURRENT_NODE) {
4951                                         int pnn;
4952                                         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
4953                                         if (pnn == -1) {
4954                                                 return -1;
4955                                         }
4956                                         options.pnn = pnn;
4957                                 }
4958                         }
4959
4960                         if (ctdb_commands[i].auto_all && 
4961                             options.pnn == CTDB_BROADCAST_ALL) {
4962                                 uint32_t *nodes;
4963                                 uint32_t num_nodes;
4964                                 ret = 0;
4965
4966                                 nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
4967                                 CTDB_NO_MEMORY(ctdb, nodes);
4968         
4969                                 for (j=0;j<num_nodes;j++) {
4970                                         options.pnn = nodes[j];
4971                                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4972                                 }
4973                                 talloc_free(nodes);
4974                         } else {
4975                                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
4976                         }
4977                         break;
4978                 }
4979         }
4980
4981         if (i == ARRAY_SIZE(ctdb_commands)) {
4982                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
4983                 exit(1);
4984         }
4985
4986         return ret;
4987 }