ctdb-tools: Add -x option to specify delimiter for machine readable output
[ambi/samba-autobuild/.git] / ctdb / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 static void usage(void);
40
41 static struct {
42         int timelimit;
43         uint32_t pnn;
44         uint32_t *nodes;
45         int machinereadable;
46         const char *machineseparator;
47         int verbose;
48         int maxruntime;
49         int printemptyrecords;
50         int printdatasize;
51         int printlmaster;
52         int printhash;
53         int printrecordflags;
54 } options;
55
56 #define LONGTIMEOUT options.timelimit*10
57
58 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
59 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
60
61 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
62 {
63         return (tv2->tv_sec - tv->tv_sec) +
64                 (tv2->tv_usec - tv->tv_usec)*1.0e-6;
65 }
66
67 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
68 {
69         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
70         return 0;
71 }
72
73 /* Like printf(3) but substitute for separator in format */
74 static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
75 static int printm(const char *format, ...)
76 {
77         va_list ap;
78         int ret;
79         size_t len = strlen(format);
80         char new_format[len+1];
81
82         strcpy(new_format, format);
83
84         if (options.machineseparator[0] != ':') {
85                 all_string_sub(new_format,
86                                ":", options.machineseparator, len + 1);
87         }
88
89         va_start(ap, format);
90         ret = vprintf(new_format, ap);
91         va_end(ap);
92
93         return ret;
94 }
95
96 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
97                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
98                                    "Out of memory in " __location__ )); \
99                 abort();                                                \
100         }} while (0)
101
102 static uint32_t getpnn(struct ctdb_context *ctdb)
103 {
104         if ((options.pnn == CTDB_BROADCAST_ALL) ||
105             (options.pnn == CTDB_MULTICAST)) {
106                 DEBUG(DEBUG_ERR,
107                       ("Cannot get PNN for node %u\n", options.pnn));
108                 exit(1);
109         }
110
111         if (options.pnn == CTDB_CURRENT_NODE) {
112                 return ctdb_get_pnn(ctdb);
113         } else {
114                 return options.pnn;
115         }
116 }
117
118 static void assert_single_node_only(void)
119 {
120         if ((options.pnn == CTDB_BROADCAST_ALL) ||
121             (options.pnn == CTDB_MULTICAST)) {
122                 DEBUG(DEBUG_ERR,
123                       ("This control can not be applied to multiple PNNs\n"));
124                 exit(1);
125         }
126 }
127
128 /* Pretty print the flags to a static buffer in human-readable format.
129  * This never returns NULL!
130  */
131 static const char *pretty_print_flags(uint32_t flags)
132 {
133         int j;
134         static const struct {
135                 uint32_t flag;
136                 const char *name;
137         } flag_names[] = {
138                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
139                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
140                 { NODE_FLAGS_BANNED,                "BANNED" },
141                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
142                 { NODE_FLAGS_DELETED,               "DELETED" },
143                 { NODE_FLAGS_STOPPED,               "STOPPED" },
144                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
145         };
146         static char flags_str[512]; /* Big enough to contain all flag names */
147
148         flags_str[0] = '\0';
149         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
150                 if (flags & flag_names[j].flag) {
151                         if (flags_str[0] == '\0') {
152                                 (void) strcpy(flags_str, flag_names[j].name);
153                         } else {
154                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
155                                 (void) strncat(flags_str, flag_names[j].name,
156                                                sizeof(flags_str)-1);
157                         }
158                 }
159         }
160         if (flags_str[0] == '\0') {
161                 (void) strcpy(flags_str, "OK");
162         }
163
164         return flags_str;
165 }
166
167 static int h2i(char h)
168 {
169         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
170         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
171         return h - '0';
172 }
173
174 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
175 {
176         int i, len;
177         TDB_DATA key = {NULL, 0};
178
179         len = strlen(str);
180         if (len & 0x01) {
181                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
182                 return key;
183         }
184
185         key.dsize = len>>1;
186         key.dptr  = talloc_size(mem_ctx, key.dsize);
187
188         for (i=0; i < len/2; i++) {
189                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
190         }
191         return key;
192 }
193
194 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
195  * that are disconnected or deleted.  If dd_ok is true those nodes are
196  * included in the output list of nodes.  If dd_ok is false, those
197  * nodes are filtered from the "all" case and cause an error if
198  * explicitly specified.
199  */
200 static bool parse_nodestring(struct ctdb_context *ctdb,
201                              TALLOC_CTX *mem_ctx,
202                              const char * nodestring,
203                              uint32_t current_pnn,
204                              bool dd_ok,
205                              uint32_t **nodes,
206                              uint32_t *pnn_mode)
207 {
208         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
209         int n;
210         uint32_t i;
211         struct ctdb_node_map *nodemap;
212         int ret;
213
214         *nodes = NULL;
215
216         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
217         if (ret != 0) {
218                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
219                 talloc_free(tmp_ctx);
220                 exit(10);
221         }
222
223         if (nodestring != NULL) {
224                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
225                 if (*nodes == NULL) {
226                         goto failed;
227                 }
228
229                 n = 0;
230
231                 if (strcmp(nodestring, "all") == 0) {
232                         *pnn_mode = CTDB_BROADCAST_ALL;
233
234                         /* all */
235                         for (i = 0; i < nodemap->num; i++) {
236                                 if ((nodemap->nodes[i].flags &
237                                      (NODE_FLAGS_DISCONNECTED |
238                                       NODE_FLAGS_DELETED)) && !dd_ok) {
239                                         continue;
240                                 }
241                                 *nodes = talloc_realloc(mem_ctx, *nodes,
242                                                         uint32_t, n+1);
243                                 if (*nodes == NULL) {
244                                         goto failed;
245                                 }
246                                 (*nodes)[n] = i;
247                                 n++;
248                         }
249                 } else {
250                         /* x{,y...} */
251                         char *ns, *tok;
252
253                         ns = talloc_strdup(tmp_ctx, nodestring);
254                         tok = strtok(ns, ",");
255                         while (tok != NULL) {
256                                 uint32_t pnn;
257                                 char *endptr;
258                                 i = (uint32_t)strtoul(tok, &endptr, 0);
259                                 if (i == 0 && tok == endptr) {
260                                         DEBUG(DEBUG_ERR,
261                                               ("Invalid node %s\n", tok));
262                                         talloc_free(tmp_ctx);
263                                         exit(ERR_NONODE);
264                                 }
265                                 if (i >= nodemap->num) {
266                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
267                                         talloc_free(tmp_ctx);
268                                         exit(ERR_NONODE);
269                                 }
270                                 if ((nodemap->nodes[i].flags & 
271                                      (NODE_FLAGS_DISCONNECTED |
272                                       NODE_FLAGS_DELETED)) && !dd_ok) {
273                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
274                                         talloc_free(tmp_ctx);
275                                         exit(ERR_DISNODE);
276                                 }
277                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
278                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
279                                         talloc_free(tmp_ctx);
280                                         exit(10);
281                                 }
282
283                                 *nodes = talloc_realloc(mem_ctx, *nodes,
284                                                         uint32_t, n+1);
285                                 if (*nodes == NULL) {
286                                         goto failed;
287                                 }
288
289                                 (*nodes)[n] = i;
290                                 n++;
291
292                                 tok = strtok(NULL, ",");
293                         }
294                         talloc_free(ns);
295
296                         if (n == 1) {
297                                 *pnn_mode = (*nodes)[0];
298                         } else {
299                                 *pnn_mode = CTDB_MULTICAST;
300                         }
301                 }
302         } else {
303                 /* default - no nodes specified */
304                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
305                 if (*nodes == NULL) {
306                         goto failed;
307                 }
308                 *pnn_mode = CTDB_CURRENT_NODE;
309
310                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
311                         goto failed;
312                 }
313         }
314
315         talloc_free(tmp_ctx);
316         return true;
317
318 failed:
319         talloc_free(tmp_ctx);
320         return false;
321 }
322
323 /*
324  check if a database exists
325 */
326 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
327                       uint32_t *dbid, const char **dbname, uint8_t *flags)
328 {
329         int i, ret;
330         struct ctdb_dbid_map *dbmap=NULL;
331         bool dbid_given = false, found = false;
332         uint32_t id;
333         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
334         const char *name;
335
336         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
337         if (ret != 0) {
338                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
339                 goto fail;
340         }
341
342         if (strncmp(dbarg, "0x", 2) == 0) {
343                 id = strtoul(dbarg, NULL, 0);
344                 dbid_given = true;
345         }
346
347         for(i=0; i<dbmap->num; i++) {
348                 if (dbid_given) {
349                         if (id == dbmap->dbs[i].dbid) {
350                                 found = true;
351                                 break;
352                         }
353                 } else {
354                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
355                         if (ret != 0) {
356                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
357                                 goto fail;
358                         }
359
360                         if (strcmp(name, dbarg) == 0) {
361                                 id = dbmap->dbs[i].dbid;
362                                 found = true;
363                                 break;
364                         }
365                 }
366         }
367
368         if (found && dbid_given && dbname != NULL) {
369                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
372                         found = false;
373                         goto fail;
374                 }
375         }
376
377         if (found) {
378                 if (dbid) *dbid = id;
379                 if (dbname) *dbname = talloc_strdup(ctdb, name);
380                 if (flags) *flags = dbmap->dbs[i].flags;
381         } else {
382                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
383         }
384
385 fail:
386         talloc_free(tmp_ctx);
387         return found;
388 }
389
390 /*
391   see if a process exists
392  */
393 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
394 {
395         uint32_t pnn, pid;
396         int ret;
397         if (argc < 1) {
398                 usage();
399         }
400
401         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
402                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
403                 return -1;
404         }
405
406         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
407         if (ret == 0) {
408                 printf("%u:%u exists\n", pnn, pid);
409         } else {
410                 printf("%u:%u does not exist\n", pnn, pid);
411         }
412         return ret;
413 }
414
415 /*
416   display statistics structure
417  */
418 static void show_statistics(struct ctdb_statistics *s, int show_header)
419 {
420         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
421         int i;
422         const char *prefix=NULL;
423         int preflen=0;
424         int tmp, days, hours, minutes, seconds;
425         const struct {
426                 const char *name;
427                 uint32_t offset;
428         } fields[] = {
429 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
430                 STATISTICS_FIELD(num_clients),
431                 STATISTICS_FIELD(frozen),
432                 STATISTICS_FIELD(recovering),
433                 STATISTICS_FIELD(num_recoveries),
434                 STATISTICS_FIELD(client_packets_sent),
435                 STATISTICS_FIELD(client_packets_recv),
436                 STATISTICS_FIELD(node_packets_sent),
437                 STATISTICS_FIELD(node_packets_recv),
438                 STATISTICS_FIELD(keepalive_packets_sent),
439                 STATISTICS_FIELD(keepalive_packets_recv),
440                 STATISTICS_FIELD(node.req_call),
441                 STATISTICS_FIELD(node.reply_call),
442                 STATISTICS_FIELD(node.req_dmaster),
443                 STATISTICS_FIELD(node.reply_dmaster),
444                 STATISTICS_FIELD(node.reply_error),
445                 STATISTICS_FIELD(node.req_message),
446                 STATISTICS_FIELD(node.req_control),
447                 STATISTICS_FIELD(node.reply_control),
448                 STATISTICS_FIELD(client.req_call),
449                 STATISTICS_FIELD(client.req_message),
450                 STATISTICS_FIELD(client.req_control),
451                 STATISTICS_FIELD(timeouts.call),
452                 STATISTICS_FIELD(timeouts.control),
453                 STATISTICS_FIELD(timeouts.traverse),
454                 STATISTICS_FIELD(locks.num_calls),
455                 STATISTICS_FIELD(locks.num_current),
456                 STATISTICS_FIELD(locks.num_pending),
457                 STATISTICS_FIELD(locks.num_failed),
458                 STATISTICS_FIELD(total_calls),
459                 STATISTICS_FIELD(pending_calls),
460                 STATISTICS_FIELD(childwrite_calls),
461                 STATISTICS_FIELD(pending_childwrite_calls),
462                 STATISTICS_FIELD(memory_used),
463                 STATISTICS_FIELD(max_hop_count),
464                 STATISTICS_FIELD(total_ro_delegations),
465                 STATISTICS_FIELD(total_ro_revokes),
466         };
467         
468         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
469         seconds = tmp%60;
470         tmp    /= 60;
471         minutes = tmp%60;
472         tmp    /= 60;
473         hours   = tmp%24;
474         tmp    /= 24;
475         days    = tmp;
476
477         if (options.machinereadable){
478                 if (show_header) {
479                         printm("CTDB version:");
480                         printm("Current time of statistics:");
481                         printm("Statistics collected since:");
482                         for (i=0;i<ARRAY_SIZE(fields);i++) {
483                                 printm("%s:", fields[i].name);
484                         }
485                         printm("num_reclock_ctdbd_latency:");
486                         printm("min_reclock_ctdbd_latency:");
487                         printm("avg_reclock_ctdbd_latency:");
488                         printm("max_reclock_ctdbd_latency:");
489
490                         printm("num_reclock_recd_latency:");
491                         printm("min_reclock_recd_latency:");
492                         printm("avg_reclock_recd_latency:");
493                         printm("max_reclock_recd_latency:");
494
495                         printm("num_call_latency:");
496                         printm("min_call_latency:");
497                         printm("avg_call_latency:");
498                         printm("max_call_latency:");
499
500                         printm("num_lockwait_latency:");
501                         printm("min_lockwait_latency:");
502                         printm("avg_lockwait_latency:");
503                         printm("max_lockwait_latency:");
504
505                         printm("num_childwrite_latency:");
506                         printm("min_childwrite_latency:");
507                         printm("avg_childwrite_latency:");
508                         printm("max_childwrite_latency:");
509                         printm("\n");
510                 }
511                 printm("%d:", CTDB_PROTOCOL);
512                 printm("%d:", (int)s->statistics_current_time.tv_sec);
513                 printm("%d:", (int)s->statistics_start_time.tv_sec);
514                 for (i=0;i<ARRAY_SIZE(fields);i++) {
515                         printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
516                 }
517                 printm("%d:", s->reclock.ctdbd.num);
518                 printm("%.6f:", s->reclock.ctdbd.min);
519                 printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
520                 printm("%.6f:", s->reclock.ctdbd.max);
521
522                 printm("%d:", s->reclock.recd.num);
523                 printm("%.6f:", s->reclock.recd.min);
524                 printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
525                 printm("%.6f:", s->reclock.recd.max);
526
527                 printm("%d:", s->call_latency.num);
528                 printm("%.6f:", s->call_latency.min);
529                 printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
530                 printm("%.6f:", s->call_latency.max);
531
532                 printm("%d:", s->childwrite_latency.num);
533                 printm("%.6f:", s->childwrite_latency.min);
534                 printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
535                 printm("%.6f:", s->childwrite_latency.max);
536                 printm("\n");
537         } else {
538                 printf("CTDB version %u\n", CTDB_PROTOCOL);
539                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
540                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
541
542                 for (i=0;i<ARRAY_SIZE(fields);i++) {
543                         if (strchr(fields[i].name, '.')) {
544                                 preflen = strcspn(fields[i].name, ".")+1;
545                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
546                                         prefix = fields[i].name;
547                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
548                                 }
549                         } else {
550                                 preflen = 0;
551                         }
552                         printf(" %*s%-22s%*s%10u\n", 
553                                preflen?4:0, "",
554                                fields[i].name+preflen, 
555                                preflen?0:4, "",
556                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
557                 }
558                 printf(" hop_count_buckets:");
559                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
560                         printf(" %d", s->hop_count_bucket[i]);
561                 }
562                 printf("\n");
563                 printf(" lock_buckets:");
564                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
565                         printf(" %d", s->locks.buckets[i]);
566                 }
567                 printf("\n");
568                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
569
570                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
571
572                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
573
574                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
575                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
576         }
577
578         talloc_free(tmp_ctx);
579 }
580
581 /*
582   display remote ctdb statistics combined from all nodes
583  */
584 static int control_statistics_all(struct ctdb_context *ctdb)
585 {
586         int ret, i;
587         struct ctdb_statistics statistics;
588         uint32_t *nodes;
589         uint32_t num_nodes;
590
591         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
592         CTDB_NO_MEMORY(ctdb, nodes);
593         
594         ZERO_STRUCT(statistics);
595
596         for (i=0;i<num_nodes;i++) {
597                 struct ctdb_statistics s1;
598                 int j;
599                 uint32_t *v1 = (uint32_t *)&s1;
600                 uint32_t *v2 = (uint32_t *)&statistics;
601                 uint32_t num_ints = 
602                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
603                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
604                 if (ret != 0) {
605                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
606                         return ret;
607                 }
608                 for (j=0;j<num_ints;j++) {
609                         v2[j] += v1[j];
610                 }
611                 statistics.max_hop_count = 
612                         MAX(statistics.max_hop_count, s1.max_hop_count);
613                 statistics.call_latency.max = 
614                         MAX(statistics.call_latency.max, s1.call_latency.max);
615         }
616         talloc_free(nodes);
617         printf("Gathered statistics for %u nodes\n", num_nodes);
618         show_statistics(&statistics, 1);
619         return 0;
620 }
621
622 /*
623   display remote ctdb statistics
624  */
625 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
626 {
627         int ret;
628         struct ctdb_statistics statistics;
629
630         if (options.pnn == CTDB_BROADCAST_ALL) {
631                 return control_statistics_all(ctdb);
632         }
633
634         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
635         if (ret != 0) {
636                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
637                 return ret;
638         }
639         show_statistics(&statistics, 1);
640         return 0;
641 }
642
643
644 /*
645   reset remote ctdb statistics
646  */
647 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
648 {
649         int ret;
650
651         ret = ctdb_statistics_reset(ctdb, options.pnn);
652         if (ret != 0) {
653                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
654                 return ret;
655         }
656         return 0;
657 }
658
659
660 /*
661   display remote ctdb rolling statistics
662  */
663 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
664 {
665         int ret;
666         struct ctdb_statistics_wire *stats;
667         int i, num_records = -1;
668
669         assert_single_node_only();
670
671         if (argc ==1) {
672                 num_records = atoi(argv[0]) - 1;
673         }
674
675         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
678                 return ret;
679         }
680         for (i=0;i<stats->num;i++) {
681                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
682                         continue;
683                 }
684                 show_statistics(&stats->stats[i], i==0);
685                 if (i == num_records) {
686                         break;
687                 }
688         }
689         return 0;
690 }
691
692
693 /*
694   display remote ctdb db statistics
695  */
696 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
697 {
698         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
699         struct ctdb_db_statistics *dbstat;
700         int i;
701         uint32_t db_id;
702         int num_hot_keys;
703         int ret;
704
705         if (argc < 1) {
706                 usage();
707         }
708
709         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
710                 return -1;
711         }
712
713         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
714         if (ret != 0) {
715                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
716                 talloc_free(tmp_ctx);
717                 return -1;
718         }
719
720         printf("DB Statistics: %s\n", argv[0]);
721         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
722                 dbstat->db_ro_delegations);
723         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
724                 dbstat->db_ro_delegations);
725         printf(" %s\n", "locks");
726         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
727                 dbstat->locks.num_calls);
728         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
729                 dbstat->locks.num_failed);
730         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
731                 dbstat->locks.num_current);
732         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
733                 dbstat->locks.num_pending);
734         printf(" %s", "hop_count_buckets:");
735         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
736                 printf(" %d", dbstat->hop_count_bucket[i]);
737         }
738         printf("\n");
739         printf(" %s", "lock_buckets:");
740         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
741                 printf(" %d", dbstat->locks.buckets[i]);
742         }
743         printf("\n");
744         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
745                 "locks_latency      MIN/AVG/MAX",
746                 dbstat->locks.latency.min,
747                 (dbstat->locks.latency.num ?
748                  dbstat->locks.latency.total /dbstat->locks.latency.num :
749                  0.0),
750                 dbstat->locks.latency.max,
751                 dbstat->locks.latency.num);
752         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
753                 "vacuum_latency     MIN/AVG/MAX",
754                 dbstat->vacuum.latency.min,
755                 (dbstat->vacuum.latency.num ?
756                  dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
757                  0.0),
758                 dbstat->vacuum.latency.max,
759                 dbstat->vacuum.latency.num);
760         num_hot_keys = 0;
761         for (i=0; i<dbstat->num_hot_keys; i++) {
762                 if (dbstat->hot_keys[i].count > 0) {
763                         num_hot_keys++;
764                 }
765         }
766         dbstat->num_hot_keys = num_hot_keys;
767
768         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
769         for (i = 0; i < dbstat->num_hot_keys; i++) {
770                 int j;
771                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
772                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
773                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
774                 }
775                 printf("\n");
776         }
777
778         talloc_free(tmp_ctx);
779         return 0;
780 }
781
782 /*
783   display uptime of remote node
784  */
785 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
786 {
787         int ret;
788         struct ctdb_uptime *uptime = NULL;
789         int tmp, days, hours, minutes, seconds;
790
791         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
792         if (ret != 0) {
793                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
794                 return ret;
795         }
796
797         if (options.machinereadable){
798                 printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
799                 printm(":%u:%u:%u:%lf\n",
800                         (unsigned int)uptime->current_time.tv_sec,
801                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
802                         (unsigned int)uptime->last_recovery_finished.tv_sec,
803                         timeval_delta(&uptime->last_recovery_finished,
804                                       &uptime->last_recovery_started)
805                 );
806                 return 0;
807         }
808
809         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
810
811         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
812         seconds = tmp%60;
813         tmp    /= 60;
814         minutes = tmp%60;
815         tmp    /= 60;
816         hours   = tmp%24;
817         tmp    /= 24;
818         days    = tmp;
819         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
820
821         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
822         seconds = tmp%60;
823         tmp    /= 60;
824         minutes = tmp%60;
825         tmp    /= 60;
826         hours   = tmp%24;
827         tmp    /= 24;
828         days    = tmp;
829         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
830         
831         printf("Duration of last recovery/failover: %lf seconds\n",
832                 timeval_delta(&uptime->last_recovery_finished,
833                               &uptime->last_recovery_started));
834
835         return 0;
836 }
837
838 /*
839   show the PNN of the current node
840  */
841 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
842 {
843         uint32_t mypnn;
844
845         mypnn = getpnn(ctdb);
846
847         printf("PNN:%d\n", mypnn);
848         return 0;
849 }
850
851
852 struct pnn_node {
853         struct pnn_node *next, *prev;
854         ctdb_sock_addr addr;
855         int pnn;
856 };
857
858 static struct pnn_node *read_pnn_node_file(TALLOC_CTX *mem_ctx,
859                                            const char *file)
860 {
861         int nlines;
862         char **lines;
863         int i, pnn;
864         struct pnn_node *pnn_nodes = NULL;
865         struct pnn_node *pnn_node;
866
867         lines = file_lines_load(file, &nlines, 0, mem_ctx);
868         if (lines == NULL) {
869                 return NULL;
870         }
871         for (i=0, pnn=0; i<nlines; i++) {
872                 char *node;
873
874                 node = lines[i];
875                 /* strip leading spaces */
876                 while((*node == ' ') || (*node == '\t')) {
877                         node++;
878                 }
879                 if (*node == '#') {
880                         pnn++;
881                         continue;
882                 }
883                 if (strcmp(node, "") == 0) {
884                         continue;
885                 }
886                 pnn_node = talloc(mem_ctx, struct pnn_node);
887                 pnn_node->pnn = pnn++;
888
889                 if (!parse_ip(node, NULL, 0, &pnn_node->addr)) {
890                         DEBUG(DEBUG_ERR,
891                               ("Invalid IP address '%s' in file %s\n",
892                                node, file));
893                         /* Caller will free mem_ctx */
894                         return NULL;
895                 }
896
897                 DLIST_ADD_END(pnn_nodes, pnn_node, NULL);
898         }
899
900         return pnn_nodes;
901 }
902
903 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
904 {
905         const char *nodes_list;
906
907         /* read the nodes file */
908         nodes_list = getenv("CTDB_NODES");
909         if (nodes_list == NULL) {
910                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
911                                              getenv("CTDB_BASE"));
912                 if (nodes_list == NULL) {
913                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
914                         exit(1);
915                 }
916         }
917
918         return read_pnn_node_file(mem_ctx, nodes_list);
919 }
920
921 /*
922   show the PNN of the current node
923   discover the pnn by loading the nodes file and try to bind to all
924   addresses one at a time until the ip address is found.
925  */
926 static int find_node_xpnn(void)
927 {
928         TALLOC_CTX *mem_ctx = talloc_new(NULL);
929         struct pnn_node *pnn_nodes;
930         struct pnn_node *pnn_node;
931         int pnn;
932
933         pnn_nodes = read_nodes_file(mem_ctx);
934         if (pnn_nodes == NULL) {
935                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
936                 talloc_free(mem_ctx);
937                 return -1;
938         }
939
940         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
941                 if (ctdb_sys_have_ip(&pnn_node->addr)) {
942                         pnn = pnn_node->pnn;
943                         talloc_free(mem_ctx);
944                         return pnn;
945                 }
946         }
947
948         printf("Failed to detect which PNN this node is\n");
949         talloc_free(mem_ctx);
950         return -1;
951 }
952
953 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
954 {
955         uint32_t pnn;
956
957         assert_single_node_only();
958
959         pnn = find_node_xpnn();
960         if (pnn == -1) {
961                 return -1;
962         }
963
964         printf("PNN:%d\n", pnn);
965         return 0;
966 }
967
968 /* Helpers for ctdb status
969  */
970 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
971 {
972         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
973         int j;
974         bool ret = false;
975
976         if (node->flags == 0) {
977                 struct ctdb_control_get_ifaces *ifaces;
978
979                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
980                                          tmp_ctx, &ifaces) == 0) {
981                         for (j=0; j < ifaces->num; j++) {
982                                 if (ifaces->ifaces[j].link_state != 0) {
983                                         continue;
984                                 }
985                                 ret = true;
986                                 break;
987                         }
988                 }
989         }
990         talloc_free(tmp_ctx);
991
992         return ret;
993 }
994
995 static void control_status_header_machine(void)
996 {
997         printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
998                ":Inactive:PartiallyOnline:ThisNode:\n");
999 }
1000
1001 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
1002                                     struct ctdb_node_and_flags *node)
1003 {
1004         printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
1005                ctdb_addr_to_str(&node->addr),
1006                !!(node->flags&NODE_FLAGS_DISCONNECTED),
1007                !!(node->flags&NODE_FLAGS_BANNED),
1008                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
1009                !!(node->flags&NODE_FLAGS_UNHEALTHY),
1010                !!(node->flags&NODE_FLAGS_STOPPED),
1011                !!(node->flags&NODE_FLAGS_INACTIVE),
1012                is_partially_online(ctdb, node) ? 1 : 0,
1013                (node->pnn == mypnn)?'Y':'N');
1014
1015         return node->flags;
1016 }
1017
1018 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
1019                                   struct ctdb_node_and_flags *node)
1020 {
1021        printf("pnn:%d %-16s %s%s\n", node->pnn,
1022               ctdb_addr_to_str(&node->addr),
1023               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
1024               node->pnn == mypnn?" (THIS NODE)":"");
1025
1026        return node->flags;
1027 }
1028
1029 /*
1030   display remote ctdb status
1031  */
1032 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
1033 {
1034         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1035         int i;
1036         struct ctdb_vnn_map *vnnmap=NULL;
1037         struct ctdb_node_map *nodemap=NULL;
1038         uint32_t recmode, recmaster, mypnn;
1039         int num_deleted_nodes = 0;
1040         int ret;
1041
1042         mypnn = getpnn(ctdb);
1043
1044         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1045         if (ret != 0) {
1046                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1047                 talloc_free(tmp_ctx);
1048                 return -1;
1049         }
1050
1051         if (options.machinereadable) {
1052                 control_status_header_machine();
1053                 for (i=0;i<nodemap->num;i++) {
1054                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1055                                 continue;
1056                         }
1057                         (void) control_status_1_machine(ctdb, mypnn,
1058                                                         &nodemap->nodes[i]);
1059                 }
1060                 talloc_free(tmp_ctx);
1061                 return 0;
1062         }
1063
1064         for (i=0; i<nodemap->num; i++) {
1065                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1066                         num_deleted_nodes++;
1067                 }
1068         }
1069         if (num_deleted_nodes == 0) {
1070                 printf("Number of nodes:%d\n", nodemap->num);
1071         } else {
1072                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1073                        nodemap->num, num_deleted_nodes);
1074         }
1075         for(i=0;i<nodemap->num;i++){
1076                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1077                         continue;
1078                 }
1079                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1080         }
1081
1082         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1083         if (ret != 0) {
1084                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1085                 talloc_free(tmp_ctx);
1086                 return -1;
1087         }
1088         if (vnnmap->generation == INVALID_GENERATION) {
1089                 printf("Generation:INVALID\n");
1090         } else {
1091                 printf("Generation:%d\n",vnnmap->generation);
1092         }
1093         printf("Size:%d\n",vnnmap->size);
1094         for(i=0;i<vnnmap->size;i++){
1095                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1096         }
1097
1098         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1101                 talloc_free(tmp_ctx);
1102                 return -1;
1103         }
1104         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1105
1106         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1107         if (ret != 0) {
1108                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1109                 talloc_free(tmp_ctx);
1110                 return -1;
1111         }
1112         printf("Recovery master:%d\n",recmaster);
1113
1114         talloc_free(tmp_ctx);
1115         return 0;
1116 }
1117
1118 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1119 {
1120         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1121         int i, ret;
1122         struct ctdb_node_map *nodemap=NULL;
1123         uint32_t * nodes;
1124         uint32_t pnn_mode, mypnn;
1125
1126         if (argc > 1) {
1127                 usage();
1128         }
1129
1130         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1131                               options.pnn, true, &nodes, &pnn_mode)) {
1132                 return -1;
1133         }
1134
1135         if (options.machinereadable) {
1136                 control_status_header_machine();
1137         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1138                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1139         }
1140
1141         mypnn = getpnn(ctdb);
1142
1143         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1144         if (ret != 0) {
1145                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1146                 talloc_free(tmp_ctx);
1147                 return -1;
1148         }
1149
1150         ret = 0;
1151
1152         for (i = 0; i < talloc_array_length(nodes); i++) {
1153                 if (options.machinereadable) {
1154                         ret |= control_status_1_machine(ctdb, mypnn,
1155                                                         &nodemap->nodes[nodes[i]]);
1156                 } else {
1157                         ret |= control_status_1_human(ctdb, mypnn,
1158                                                       &nodemap->nodes[nodes[i]]);
1159                 }
1160         }
1161
1162         talloc_free(tmp_ctx);
1163         return ret;
1164 }
1165
1166 static struct pnn_node *read_natgw_nodes_file(struct ctdb_context *ctdb,
1167                                               TALLOC_CTX *mem_ctx)
1168 {
1169         const char *natgw_list;
1170         struct pnn_node *natgw_nodes = NULL;
1171
1172         natgw_list = getenv("CTDB_NATGW_NODES");
1173         if (natgw_list == NULL) {
1174                 natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
1175                                              getenv("CTDB_BASE"));
1176                 if (natgw_list == NULL) {
1177                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1178                         exit(1);
1179                 }
1180         }
1181         /* The PNNs will be junk but they're not used */
1182         natgw_nodes = read_pnn_node_file(mem_ctx, natgw_list);
1183         if (natgw_nodes == NULL) {
1184                 DEBUG(DEBUG_ERR,
1185                       ("Failed to load natgw node list '%s'\n", natgw_list));
1186         }
1187         return natgw_nodes;
1188 }
1189
1190
1191 /* talloc off the existing nodemap... */
1192 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
1193 {
1194         return talloc_zero_size(nodemap,
1195                                 offsetof(struct ctdb_node_map, nodes) +
1196                                 nodemap->num * sizeof(struct ctdb_node_and_flags));
1197 }
1198
1199 static struct ctdb_node_map *
1200 filter_nodemap_by_addrs(struct ctdb_context *ctdb,
1201                         struct ctdb_node_map *nodemap,
1202                         struct pnn_node *nodes)
1203 {
1204         int i;
1205         struct pnn_node *n;
1206         struct ctdb_node_map *ret;
1207
1208         ret = talloc_nodemap(nodemap);
1209         CTDB_NO_MEMORY_NULL(ctdb, ret);
1210
1211         ret->num = 0;
1212
1213         for (i = 0; i < nodemap->num; i++) {
1214                 for(n = nodes; n != NULL ; n = n->next) {
1215                         if (ctdb_same_ip(&n->addr,
1216                                          &nodemap->nodes[i].addr)) {
1217                                 break;
1218                         }
1219                 }
1220                 if (n == NULL) {
1221                         continue;
1222                 }
1223
1224                 ret->nodes[ret->num] = nodemap->nodes[i];
1225                 ret->num++;
1226         }
1227
1228         return ret;
1229 }
1230
1231 static struct ctdb_node_map *
1232 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
1233                                struct ctdb_node_map *nodemap,
1234                                uint32_t required_capabilities,
1235                                bool first_only)
1236 {
1237         int i;
1238         uint32_t capabilities;
1239         struct ctdb_node_map *ret;
1240
1241         ret = talloc_nodemap(nodemap);
1242         CTDB_NO_MEMORY_NULL(ctdb, ret);
1243
1244         ret->num = 0;
1245
1246         for (i = 0; i < nodemap->num; i++) {
1247                 int res;
1248
1249                 /* Disconnected nodes have no capabilities! */
1250                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1251                         continue;
1252                 }
1253
1254                 res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1255                                                 nodemap->nodes[i].pnn,
1256                                                 &capabilities);
1257                 if (res != 0) {
1258                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1259                                           nodemap->nodes[i].pnn));
1260                         talloc_free(ret);
1261                         return NULL;
1262                 }
1263                 if (!(capabilities & required_capabilities)) {
1264                         continue;
1265                 }
1266
1267                 ret->nodes[ret->num] = nodemap->nodes[i];
1268                 ret->num++;
1269                 if (first_only) {
1270                         break;
1271                 }
1272         }
1273
1274         return ret;
1275 }
1276
1277 static struct ctdb_node_map *
1278 filter_nodemap_by_flags(struct ctdb_context *ctdb,
1279                         struct ctdb_node_map *nodemap,
1280                         uint32_t flags_mask)
1281 {
1282         int i;
1283         struct ctdb_node_map *ret;
1284
1285         ret = talloc_nodemap(nodemap);
1286         CTDB_NO_MEMORY_NULL(ctdb, ret);
1287
1288         ret->num = 0;
1289
1290         for (i = 0; i < nodemap->num; i++) {
1291                 if (nodemap->nodes[i].flags & flags_mask) {
1292                         continue;
1293                 }
1294
1295                 ret->nodes[ret->num] = nodemap->nodes[i];
1296                 ret->num++;
1297         }
1298
1299         return ret;
1300 }
1301
1302 /*
1303   display the list of nodes belonging to this natgw configuration
1304  */
1305 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1306 {
1307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1308         int i, ret;
1309         struct pnn_node *natgw_nodes = NULL;
1310         struct ctdb_node_map *orig_nodemap=NULL;
1311         struct ctdb_node_map *nodemap;
1312         uint32_t mypnn, pnn;
1313         const char *ip;
1314
1315         /* When we have some nodes that could be the NATGW, make a
1316          * series of attempts to find the first node that doesn't have
1317          * certain status flags set.
1318          */
1319         uint32_t exclude_flags[] = {
1320                 /* Look for a nice healthy node */
1321                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1322                 /* If not found, an UNHEALTHY/BANNED node will do */
1323                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1324                 /* If not found, a STOPPED node will do */
1325                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1326                 0,
1327         };
1328
1329         /* read the natgw nodes file into a linked list */
1330         natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
1331         if (natgw_nodes == NULL) {
1332                 ret = -1;
1333                 goto done;
1334         }
1335
1336         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
1337                                    tmp_ctx, &orig_nodemap);
1338         if (ret != 0) {
1339                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1340                 talloc_free(tmp_ctx);
1341                 return -1;
1342         }
1343
1344         /* Get a nodemap that includes only the nodes in the NATGW
1345          * group */
1346         nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
1347         if (nodemap == NULL) {
1348                 ret = -1;
1349                 goto done;
1350         }
1351
1352         ret = 2; /* matches ENOENT */
1353         pnn = -1;
1354         ip = "0.0.0.0";
1355         /* For each flag mask... */
1356         for (i = 0; exclude_flags[i] != 0; i++) {
1357                 /* ... get a nodemap that excludes nodes with with
1358                  * masked flags... */
1359                 struct ctdb_node_map *t =
1360                         filter_nodemap_by_flags(ctdb, nodemap,
1361                                                 exclude_flags[i]);
1362                 if (t == NULL) {
1363                         /* No memory */
1364                         ret = -1;
1365                         goto done;
1366                 }
1367                 if (t->num > 0) {
1368                         /* ... and find the first node with the NATGW
1369                          * capability */
1370                         struct ctdb_node_map *n;
1371                         n = filter_nodemap_by_capabilities(ctdb, t,
1372                                                            CTDB_CAP_NATGW,
1373                                                            true);
1374                         if (n == NULL) {
1375                                 /* No memory */
1376                                 ret = -1;
1377                                 goto done;
1378                         }
1379                         if (n->num > 0) {
1380                                 ret = 0;
1381                                 pnn = n->nodes[0].pnn;
1382                                 ip = ctdb_addr_to_str(&n->nodes[0].addr);
1383                                 break;
1384                         }
1385                 }
1386                 talloc_free(t);
1387         }
1388
1389         if (options.machinereadable) {
1390                 printm(":Node:IP:\n");
1391                 printm(":%d:%s:\n", pnn, ip);
1392         } else {
1393                 printf("%d %s\n", pnn, ip);
1394         }
1395
1396         /* print the pruned list of nodes belonging to this natgw list */
1397         mypnn = getpnn(ctdb);
1398         if (options.machinereadable) {
1399                 control_status_header_machine();
1400         } else {
1401                 printf("Number of nodes:%d\n", nodemap->num);
1402         }
1403         for(i=0;i<nodemap->num;i++){
1404                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1405                         continue;
1406                 }
1407                 if (options.machinereadable) {
1408                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1409                 } else {
1410                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1411                 }
1412         }
1413
1414 done:
1415         talloc_free(tmp_ctx);
1416         return ret;
1417 }
1418
1419 /*
1420   display the status of the scripts for monitoring (or other events)
1421  */
1422 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1423                                     enum ctdb_eventscript_call type)
1424 {
1425         struct ctdb_scripts_wire *script_status;
1426         int ret, i;
1427
1428         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1431                 return ret;
1432         }
1433
1434         if (script_status == NULL) {
1435                 if (!options.machinereadable) {
1436                         printf("%s cycle never run\n",
1437                                ctdb_eventscript_call_names[type]);
1438                 }
1439                 return 0;
1440         }
1441
1442         if (!options.machinereadable) {
1443                 int num_run = 0;
1444                 for (i=0; i<script_status->num_scripts; i++) {
1445                         if (script_status->scripts[i].status != -ENOEXEC) {
1446                                 num_run++;
1447                         }
1448                 }
1449                 printf("%d scripts were executed last %s cycle\n",
1450                        num_run,
1451                        ctdb_eventscript_call_names[type]);
1452         }
1453         for (i=0; i<script_status->num_scripts; i++) {
1454                 const char *status = NULL;
1455
1456                 switch (script_status->scripts[i].status) {
1457                 case -ETIME:
1458                         status = "TIMEDOUT";
1459                         break;
1460                 case -ENOEXEC:
1461                         status = "DISABLED";
1462                         break;
1463                 case 0:
1464                         status = "OK";
1465                         break;
1466                 default:
1467                         if (script_status->scripts[i].status > 0)
1468                                 status = "ERROR";
1469                         break;
1470                 }
1471                 if (options.machinereadable) {
1472                         printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1473                                ctdb_eventscript_call_names[type],
1474                                script_status->scripts[i].name,
1475                                script_status->scripts[i].status,
1476                                status,
1477                                (long)script_status->scripts[i].start.tv_sec,
1478                                (long)script_status->scripts[i].start.tv_usec,
1479                                (long)script_status->scripts[i].finished.tv_sec,
1480                                (long)script_status->scripts[i].finished.tv_usec,
1481                                script_status->scripts[i].output);
1482                         continue;
1483                 }
1484                 if (status)
1485                         printf("%-20s Status:%s    ",
1486                                script_status->scripts[i].name, status);
1487                 else
1488                         /* Some other error, eg from stat. */
1489                         printf("%-20s Status:CANNOT RUN (%s)",
1490                                script_status->scripts[i].name,
1491                                strerror(-script_status->scripts[i].status));
1492
1493                 if (script_status->scripts[i].status >= 0) {
1494                         printf("Duration:%.3lf ",
1495                         timeval_delta(&script_status->scripts[i].finished,
1496                               &script_status->scripts[i].start));
1497                 }
1498                 if (script_status->scripts[i].status != -ENOEXEC) {
1499                         printf("%s",
1500                                ctime(&script_status->scripts[i].start.tv_sec));
1501                         if (script_status->scripts[i].status != 0) {
1502                                 printf("   OUTPUT:%s\n",
1503                                        script_status->scripts[i].output);
1504                         }
1505                 } else {
1506                         printf("\n");
1507                 }
1508         }
1509         return 0;
1510 }
1511
1512
1513 static int control_scriptstatus(struct ctdb_context *ctdb,
1514                                 int argc, const char **argv)
1515 {
1516         int ret;
1517         enum ctdb_eventscript_call type, min, max;
1518         const char *arg;
1519
1520         if (argc > 1) {
1521                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1522                 return -1;
1523         }
1524
1525         if (argc == 0)
1526                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1527         else
1528                 arg = argv[0];
1529
1530         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1531                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1532                         min = type;
1533                         max = type+1;
1534                         break;
1535                 }
1536         }
1537         if (type == CTDB_EVENT_MAX) {
1538                 if (strcmp(arg, "all") == 0) {
1539                         min = 0;
1540                         max = CTDB_EVENT_MAX;
1541                 } else {
1542                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1543                         return -1;
1544                 }
1545         }
1546
1547         if (options.machinereadable) {
1548                 printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1549         }
1550
1551         for (type = min; type < max; type++) {
1552                 ret = control_one_scriptstatus(ctdb, type);
1553                 if (ret != 0) {
1554                         return ret;
1555                 }
1556         }
1557
1558         return 0;
1559 }
1560
1561 /*
1562   enable an eventscript
1563  */
1564 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1565 {
1566         int ret;
1567
1568         if (argc < 1) {
1569                 usage();
1570         }
1571
1572         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1573         if (ret != 0) {
1574           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1575                 return ret;
1576         }
1577
1578         return 0;
1579 }
1580
1581 /*
1582   disable an eventscript
1583  */
1584 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1585 {
1586         int ret;
1587
1588         if (argc < 1) {
1589                 usage();
1590         }
1591
1592         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1593         if (ret != 0) {
1594           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1595                 return ret;
1596         }
1597
1598         return 0;
1599 }
1600
1601 /*
1602   display the pnn of the recovery master
1603  */
1604 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1605 {
1606         uint32_t recmaster;
1607         int ret;
1608
1609         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1610         if (ret != 0) {
1611                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1612                 return -1;
1613         }
1614         printf("%d\n",recmaster);
1615
1616         return 0;
1617 }
1618
1619 /*
1620   add a tickle to a public address
1621  */
1622 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1623 {
1624         struct ctdb_tcp_connection t;
1625         TDB_DATA data;
1626         int ret;
1627
1628         assert_single_node_only();
1629
1630         if (argc < 2) {
1631                 usage();
1632         }
1633
1634         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1635                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1636                 return -1;
1637         }
1638         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1639                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1640                 return -1;
1641         }
1642
1643         data.dptr = (uint8_t *)&t;
1644         data.dsize = sizeof(t);
1645
1646         /* tell all nodes about this tcp connection */
1647         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1648                            0, data, ctdb, NULL, NULL, NULL, NULL);
1649         if (ret != 0) {
1650                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1651                 return -1;
1652         }
1653         
1654         return 0;
1655 }
1656
1657
1658 /*
1659   delete a tickle from a node
1660  */
1661 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1662 {
1663         struct ctdb_tcp_connection t;
1664         TDB_DATA data;
1665         int ret;
1666
1667         assert_single_node_only();
1668
1669         if (argc < 2) {
1670                 usage();
1671         }
1672
1673         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1674                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1675                 return -1;
1676         }
1677         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1678                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1679                 return -1;
1680         }
1681
1682         data.dptr = (uint8_t *)&t;
1683         data.dsize = sizeof(t);
1684
1685         /* tell all nodes about this tcp connection */
1686         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1687                            0, data, ctdb, NULL, NULL, NULL, NULL);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1690                 return -1;
1691         }
1692         
1693         return 0;
1694 }
1695
1696
1697 /*
1698   get a list of all tickles for this pnn
1699  */
1700 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1701 {
1702         struct ctdb_control_tcp_tickle_list *list;
1703         ctdb_sock_addr addr;
1704         int i, ret;
1705         unsigned port = 0;
1706
1707         assert_single_node_only();
1708
1709         if (argc < 1) {
1710                 usage();
1711         }
1712
1713         if (argc == 2) {
1714                 port = atoi(argv[1]);
1715         }
1716
1717         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1718                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1719                 return -1;
1720         }
1721
1722         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1723         if (ret == -1) {
1724                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1725                 return -1;
1726         }
1727
1728         if (options.machinereadable){
1729                 printm(":source ip:port:destination ip:port:\n");
1730                 for (i=0;i<list->tickles.num;i++) {
1731                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1732                                 continue;
1733                         }
1734                         printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1735                         printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1736                 }
1737         } else {
1738                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1739                 printf("Num tickles:%u\n", list->tickles.num);
1740                 for (i=0;i<list->tickles.num;i++) {
1741                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1742                                 continue;
1743                         }
1744                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1745                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1746                 }
1747         }
1748
1749         talloc_free(list);
1750         
1751         return 0;
1752 }
1753
1754
1755 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1756 {
1757         struct ctdb_all_public_ips *ips;
1758         struct ctdb_public_ip ip;
1759         int i, ret;
1760         uint32_t *nodes;
1761         uint32_t disable_time;
1762         TDB_DATA data;
1763         struct ctdb_node_map *nodemap=NULL;
1764         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1765
1766         disable_time = 30;
1767         data.dptr  = (uint8_t*)&disable_time;
1768         data.dsize = sizeof(disable_time);
1769         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1770         if (ret != 0) {
1771                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1772                 return -1;
1773         }
1774
1775
1776
1777         /* read the public ip list from the node */
1778         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1779         if (ret != 0) {
1780                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1781                 talloc_free(tmp_ctx);
1782                 return -1;
1783         }
1784
1785         for (i=0;i<ips->num;i++) {
1786                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1787                         break;
1788                 }
1789         }
1790         if (i==ips->num) {
1791                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1792                         pnn, ctdb_addr_to_str(addr)));
1793                 talloc_free(tmp_ctx);
1794                 return -1;
1795         }
1796
1797         ip.pnn  = pnn;
1798         ip.addr = *addr;
1799
1800         data.dptr  = (uint8_t *)&ip;
1801         data.dsize = sizeof(ip);
1802
1803         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1804         if (ret != 0) {
1805                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1806                 talloc_free(tmp_ctx);
1807                 return ret;
1808         }
1809
1810         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1811         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1812                                         nodes, 0,
1813                                         LONGTIMELIMIT(),
1814                                         false, data,
1815                                         NULL, NULL,
1816                                         NULL);
1817         if (ret != 0) {
1818                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1819                 talloc_free(tmp_ctx);
1820                 return -1;
1821         }
1822
1823         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1824         if (ret != 0) {
1825                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1826                 talloc_free(tmp_ctx);
1827                 return -1;
1828         }
1829
1830         /* update the recovery daemon so it now knows to expect the new
1831            node assignment for this ip.
1832         */
1833         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1836                 return -1;
1837         }
1838
1839         talloc_free(tmp_ctx);
1840         return 0;
1841 }
1842
1843
1844 /* 
1845  * scans all other nodes and returns a pnn for another node that can host this 
1846  * ip address or -1
1847  */
1848 static int
1849 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1850 {
1851         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1852         struct ctdb_all_public_ips *ips;
1853         struct ctdb_node_map *nodemap=NULL;
1854         int i, j, ret;
1855         int pnn;
1856
1857         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1858         if (ret != 0) {
1859                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1860                 talloc_free(tmp_ctx);
1861                 return ret;
1862         }
1863
1864         for(i=0;i<nodemap->num;i++){
1865                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1866                         continue;
1867                 }
1868                 if (nodemap->nodes[i].pnn == options.pnn) {
1869                         continue;
1870                 }
1871
1872                 /* read the public ip list from this node */
1873                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1874                 if (ret != 0) {
1875                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1876                         return -1;
1877                 }
1878
1879                 for (j=0;j<ips->num;j++) {
1880                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1881                                 pnn = nodemap->nodes[i].pnn;
1882                                 talloc_free(tmp_ctx);
1883                                 return pnn;
1884                         }
1885                 }
1886                 talloc_free(ips);
1887         }
1888
1889         talloc_free(tmp_ctx);
1890         return -1;
1891 }
1892
1893 /* If pnn is -1 then try to find a node to move IP to... */
1894 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1895 {
1896         bool pnn_specified = (pnn == -1 ? false : true);
1897         int retries = 0;
1898
1899         while (retries < 5) {
1900                 if (!pnn_specified) {
1901                         pnn = find_other_host_for_public_ip(ctdb, addr);
1902                         if (pnn == -1) {
1903                                 return false;
1904                         }
1905                         DEBUG(DEBUG_NOTICE,
1906                               ("Trying to move public IP to node %u\n", pnn));
1907                 }
1908
1909                 if (move_ip(ctdb, addr, pnn) == 0) {
1910                         return true;
1911                 }
1912
1913                 sleep(3);
1914                 retries++;
1915         }
1916
1917         return false;
1918 }
1919
1920
1921 /*
1922   move/failover an ip address to a specific node
1923  */
1924 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1925 {
1926         uint32_t pnn;
1927         ctdb_sock_addr addr;
1928
1929         assert_single_node_only();
1930
1931         if (argc < 2) {
1932                 usage();
1933                 return -1;
1934         }
1935
1936         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1937                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1938                 return -1;
1939         }
1940
1941
1942         if (sscanf(argv[1], "%u", &pnn) != 1) {
1943                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1944                 return -1;
1945         }
1946
1947         if (!try_moveip(ctdb, &addr, pnn)) {
1948                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1949                 return -1;
1950         }
1951
1952         return 0;
1953 }
1954
1955 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1956 {
1957         TDB_DATA data;
1958
1959         data.dptr  = (uint8_t *)&pnn;
1960         data.dsize = sizeof(uint32_t);
1961         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1962                 DEBUG(DEBUG_ERR,
1963                       ("Failed to send message to force node %u to be a rebalancing target\n",
1964                        pnn));
1965                 return -1;
1966         }
1967
1968         return 0;
1969 }
1970
1971
1972 /*
1973   rebalance a node by setting it to allow failback and triggering a
1974   takeover run
1975  */
1976 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1977 {
1978         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1979         uint32_t *nodes;
1980         uint32_t pnn_mode;
1981         int i, ret;
1982
1983         assert_single_node_only();
1984
1985         if (argc > 1) {
1986                 usage();
1987         }
1988
1989         /* Determine the nodes where IPs need to be reloaded */
1990         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1991                               options.pnn, true, &nodes, &pnn_mode)) {
1992                 ret = -1;
1993                 goto done;
1994         }
1995
1996         for (i = 0; i < talloc_array_length(nodes); i++) {
1997                 if (!rebalance_node(ctdb, nodes[i])) {
1998                         ret = -1;
1999                 }
2000         }
2001
2002 done:
2003         talloc_free(tmp_ctx);
2004         return ret;
2005 }
2006
2007 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
2008 {
2009         struct ctdb_public_ip ip;
2010         int ret;
2011         uint32_t *nodes;
2012         uint32_t disable_time;
2013         TDB_DATA data;
2014         struct ctdb_node_map *nodemap=NULL;
2015         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2016
2017         disable_time = 30;
2018         data.dptr  = (uint8_t*)&disable_time;
2019         data.dsize = sizeof(disable_time);
2020         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
2021         if (ret != 0) {
2022                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
2023                 return -1;
2024         }
2025
2026         ip.pnn  = -1;
2027         ip.addr = *addr;
2028
2029         data.dptr  = (uint8_t *)&ip;
2030         data.dsize = sizeof(ip);
2031
2032         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
2033         if (ret != 0) {
2034                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2035                 talloc_free(tmp_ctx);
2036                 return ret;
2037         }
2038
2039         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
2040         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
2041                                         nodes, 0,
2042                                         LONGTIMELIMIT(),
2043                                         false, data,
2044                                         NULL, NULL,
2045                                         NULL);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
2048                 talloc_free(tmp_ctx);
2049                 return -1;
2050         }
2051
2052         talloc_free(tmp_ctx);
2053         return 0;
2054 }
2055
2056 /*
2057   release an ip form all nodes and have it re-assigned by recd
2058  */
2059 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
2060 {
2061         ctdb_sock_addr addr;
2062
2063         assert_single_node_only();
2064
2065         if (argc < 1) {
2066                 usage();
2067                 return -1;
2068         }
2069
2070         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2071                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2072                 return -1;
2073         }
2074
2075         if (rebalance_ip(ctdb, &addr) != 0) {
2076                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
2077                 return -1;
2078         }
2079
2080         return 0;
2081 }
2082
2083 static int getips_store_callback(void *param, void *data)
2084 {
2085         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
2086         struct ctdb_all_public_ips *ips = param;
2087         int i;
2088
2089         i = ips->num++;
2090         ips->ips[i].pnn  = node_ip->pnn;
2091         ips->ips[i].addr = node_ip->addr;
2092         return 0;
2093 }
2094
2095 static int getips_count_callback(void *param, void *data)
2096 {
2097         uint32_t *count = param;
2098
2099         (*count)++;
2100         return 0;
2101 }
2102
2103 #define IP_KEYLEN       4
2104 static uint32_t *ip_key(ctdb_sock_addr *ip)
2105 {
2106         static uint32_t key[IP_KEYLEN];
2107
2108         bzero(key, sizeof(key));
2109
2110         switch (ip->sa.sa_family) {
2111         case AF_INET:
2112                 key[0]  = ip->ip.sin_addr.s_addr;
2113                 break;
2114         case AF_INET6: {
2115                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
2116                 key[0]  = s6_a32[3];
2117                 key[1]  = s6_a32[2];
2118                 key[2]  = s6_a32[1];
2119                 key[3]  = s6_a32[0];
2120                 break;
2121         }
2122         default:
2123                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
2124                 return key;
2125         }
2126
2127         return key;
2128 }
2129
2130 static void *add_ip_callback(void *parm, void *data)
2131 {
2132         return parm;
2133 }
2134
2135 static int
2136 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2137 {
2138         struct ctdb_all_public_ips *tmp_ips;
2139         struct ctdb_node_map *nodemap=NULL;
2140         trbt_tree_t *ip_tree;
2141         int i, j, len, ret;
2142         uint32_t count;
2143
2144         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2145         if (ret != 0) {
2146                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2147                 return ret;
2148         }
2149
2150         ip_tree = trbt_create(tmp_ctx, 0);
2151
2152         for(i=0;i<nodemap->num;i++){
2153                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2154                         continue;
2155                 }
2156                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2157                         continue;
2158                 }
2159
2160                 /* read the public ip list from this node */
2161                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2162                 if (ret != 0) {
2163                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2164                         return -1;
2165                 }
2166         
2167                 for (j=0; j<tmp_ips->num;j++) {
2168                         struct ctdb_public_ip *node_ip;
2169
2170                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2171                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2172                         node_ip->addr = tmp_ips->ips[j].addr;
2173
2174                         trbt_insertarray32_callback(ip_tree,
2175                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2176                                 add_ip_callback,
2177                                 node_ip);
2178                 }
2179                 talloc_free(tmp_ips);
2180         }
2181
2182         /* traverse */
2183         count = 0;
2184         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2185
2186         len = offsetof(struct ctdb_all_public_ips, ips) + 
2187                 count*sizeof(struct ctdb_public_ip);
2188         tmp_ips = talloc_zero_size(tmp_ctx, len);
2189         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2190
2191         *ips = tmp_ips;
2192
2193         return 0;
2194 }
2195
2196
2197 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2198 {
2199         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2200
2201         event_add_timed(ctdb->ev, ctdb, 
2202                                 timeval_current_ofs(1, 0),
2203                                 ctdb_every_second, ctdb);
2204 }
2205
2206 struct srvid_reply_handler_data {
2207         bool done;
2208         bool wait_for_all;
2209         uint32_t *nodes;
2210         const char *srvid_str;
2211 };
2212
2213 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2214                                          uint64_t srvid,
2215                                          TDB_DATA data,
2216                                          void *private_data)
2217 {
2218         struct srvid_reply_handler_data *d =
2219                 (struct srvid_reply_handler_data *)private_data;
2220         int i;
2221         int32_t ret;
2222
2223         if (data.dsize != sizeof(ret)) {
2224                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2225                 return;
2226         }
2227
2228         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2229         ret = *(int32_t *)data.dptr;
2230         if (ret < 0) {
2231                 DEBUG(DEBUG_ERR,
2232                       ("%s failed with result %d\n", d->srvid_str, ret));
2233                 return;
2234         }
2235
2236         if (!d->wait_for_all) {
2237                 d->done = true;
2238                 return;
2239         }
2240
2241         /* Wait for all replies */
2242         d->done = true;
2243         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2244                 if (d->nodes[i] == ret) {
2245                         DEBUG(DEBUG_INFO,
2246                               ("%s reply received from node %u\n",
2247                                d->srvid_str, ret));
2248                         d->nodes[i] = -1;
2249                 }
2250                 if (d->nodes[i] != -1) {
2251                         /* Found a node that hasn't yet replied */
2252                         d->done = false;
2253                 }
2254         }
2255 }
2256
2257 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2258  * or replies from all connected nodes.  arg is the data argument to
2259  * pass in the srvid_request structure - pass 0 if this isn't needed.
2260  */
2261 static int srvid_broadcast(struct ctdb_context *ctdb,
2262                            uint64_t srvid, uint32_t *arg,
2263                            const char *srvid_str, bool wait_for_all)
2264 {
2265         int ret;
2266         TDB_DATA data;
2267         uint32_t pnn;
2268         uint64_t reply_srvid;
2269         struct srvid_request request;
2270         struct srvid_request_data request_data;
2271         struct srvid_reply_handler_data reply_data;
2272         struct timeval tv;
2273
2274         ZERO_STRUCT(request);
2275
2276         /* Time ticks to enable timeouts to be processed */
2277         event_add_timed(ctdb->ev, ctdb, 
2278                                 timeval_current_ofs(1, 0),
2279                                 ctdb_every_second, ctdb);
2280
2281         pnn = ctdb_get_pnn(ctdb);
2282         reply_srvid = getpid();
2283
2284         if (arg == NULL) {
2285                 request.pnn = pnn;
2286                 request.srvid = reply_srvid;
2287
2288                 data.dptr = (uint8_t *)&request;
2289                 data.dsize = sizeof(request);
2290         } else {
2291                 request_data.pnn = pnn;
2292                 request_data.srvid = reply_srvid;
2293                 request_data.data = *arg;
2294
2295                 data.dptr = (uint8_t *)&request_data;
2296                 data.dsize = sizeof(request_data);
2297         }
2298
2299         /* Register message port for reply from recovery master */
2300         ctdb_client_set_message_handler(ctdb, reply_srvid,
2301                                         srvid_broadcast_reply_handler,
2302                                         &reply_data);
2303
2304         reply_data.wait_for_all = wait_for_all;
2305         reply_data.nodes = NULL;
2306         reply_data.srvid_str = srvid_str;
2307
2308 again:
2309         reply_data.done = false;
2310
2311         if (wait_for_all) {
2312                 struct ctdb_node_map *nodemap;
2313
2314                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2315                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2316                 if (ret != 0) {
2317                         DEBUG(DEBUG_ERR,
2318                               ("Unable to get nodemap from current node, try again\n"));
2319                         sleep(1);
2320                         goto again;
2321                 }
2322
2323                 if (reply_data.nodes != NULL) {
2324                         talloc_free(reply_data.nodes);
2325                 }
2326                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2327                                                            NULL, true);
2328
2329                 talloc_free(nodemap);
2330         }
2331
2332         /* Send to all connected nodes. Only recmaster replies */
2333         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2334                                        srvid, data);
2335         if (ret != 0) {
2336                 /* This can only happen if the socket is closed and
2337                  * there's no way to recover from that, so don't try
2338                  * again.
2339                  */
2340                 DEBUG(DEBUG_ERR,
2341                       ("Failed to send %s request to connected nodes\n",
2342                        srvid_str));
2343                 return -1;
2344         }
2345
2346         tv = timeval_current();
2347         /* This loop terminates the reply is received */
2348         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2349                 event_loop_once(ctdb->ev);
2350         }
2351
2352         if (!reply_data.done) {
2353                 DEBUG(DEBUG_NOTICE,
2354                       ("Still waiting for confirmation of %s\n", srvid_str));
2355                 sleep(1);
2356                 goto again;
2357         }
2358
2359         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2360
2361         talloc_free(reply_data.nodes);
2362
2363         return 0;
2364 }
2365
2366 static int ipreallocate(struct ctdb_context *ctdb)
2367 {
2368         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2369                                "IP reallocation", false);
2370 }
2371
2372
2373 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2374 {
2375         return ipreallocate(ctdb);
2376 }
2377
2378 /*
2379   add a public ip address to a node
2380  */
2381 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2382 {
2383         int i, ret;
2384         int len, retries = 0;
2385         unsigned mask;
2386         ctdb_sock_addr addr;
2387         struct ctdb_control_ip_iface *pub;
2388         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2389         struct ctdb_all_public_ips *ips;
2390
2391
2392         if (argc != 2) {
2393                 talloc_free(tmp_ctx);
2394                 usage();
2395         }
2396
2397         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2398                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2399                 talloc_free(tmp_ctx);
2400                 return -1;
2401         }
2402
2403         /* read the public ip list from the node */
2404         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2405         if (ret != 0) {
2406                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2407                 talloc_free(tmp_ctx);
2408                 return -1;
2409         }
2410         for (i=0;i<ips->num;i++) {
2411                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2412                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2413                         return 0;
2414                 }
2415         }
2416
2417
2418
2419         /* Dont timeout. This command waits for an ip reallocation
2420            which sometimes can take wuite a while if there has
2421            been a recent recovery
2422         */
2423         alarm(0);
2424
2425         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2426         pub = talloc_size(tmp_ctx, len); 
2427         CTDB_NO_MEMORY(ctdb, pub);
2428
2429         pub->addr  = addr;
2430         pub->mask  = mask;
2431         pub->len   = strlen(argv[1])+1;
2432         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2433
2434         do {
2435                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2436                 if (ret != 0) {
2437                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2438                         sleep(3);
2439                         retries++;
2440                 }
2441         } while (retries < 5 && ret != 0);
2442         if (ret != 0) {
2443                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2444                 talloc_free(tmp_ctx);
2445                 return ret;
2446         }
2447
2448         if (rebalance_node(ctdb, options.pnn) != 0) {
2449                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2450                 return ret;
2451         }
2452
2453         talloc_free(tmp_ctx);
2454         return 0;
2455 }
2456
2457 /*
2458   add a public ip address to a node
2459  */
2460 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2461 {
2462         ctdb_sock_addr addr;
2463
2464         if (argc != 1) {
2465                 usage();
2466         }
2467
2468         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2469                 printf("Badly formed ip : %s\n", argv[0]);
2470                 return -1;
2471         }
2472
2473         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2474
2475         return 0;
2476 }
2477
2478 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2479
2480 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2481 {
2482         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2483         struct ctdb_node_map *nodemap=NULL;
2484         struct ctdb_all_public_ips *ips;
2485         int ret, i, j;
2486
2487         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2488         if (ret != 0) {
2489                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2490                 return ret;
2491         }
2492
2493         /* remove it from the nodes that are not hosting the ip currently */
2494         for(i=0;i<nodemap->num;i++){
2495                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2496                         continue;
2497                 }
2498                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2499                 if (ret != 0) {
2500                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2501                         continue;
2502                 }
2503
2504                 for (j=0;j<ips->num;j++) {
2505                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2506                                 break;
2507                         }
2508                 }
2509                 if (j==ips->num) {
2510                         continue;
2511                 }
2512
2513                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2514                         continue;
2515                 }
2516
2517                 options.pnn = nodemap->nodes[i].pnn;
2518                 control_delip(ctdb, argc, argv);
2519         }
2520
2521
2522         /* remove it from every node (also the one hosting it) */
2523         for(i=0;i<nodemap->num;i++){
2524                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2525                         continue;
2526                 }
2527                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2528                 if (ret != 0) {
2529                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2530                         continue;
2531                 }
2532
2533                 for (j=0;j<ips->num;j++) {
2534                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2535                                 break;
2536                         }
2537                 }
2538                 if (j==ips->num) {
2539                         continue;
2540                 }
2541
2542                 options.pnn = nodemap->nodes[i].pnn;
2543                 control_delip(ctdb, argc, argv);
2544         }
2545
2546         talloc_free(tmp_ctx);
2547         return 0;
2548 }
2549         
2550 /*
2551   delete a public ip address from a node
2552  */
2553 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2554 {
2555         int i, ret;
2556         ctdb_sock_addr addr;
2557         struct ctdb_control_ip_iface pub;
2558         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2559         struct ctdb_all_public_ips *ips;
2560
2561         if (argc != 1) {
2562                 talloc_free(tmp_ctx);
2563                 usage();
2564         }
2565
2566         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2567                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2568                 return -1;
2569         }
2570
2571         if (options.pnn == CTDB_BROADCAST_ALL) {
2572                 return control_delip_all(ctdb, argc, argv, &addr);
2573         }
2574
2575         pub.addr  = addr;
2576         pub.mask  = 0;
2577         pub.len   = 0;
2578
2579         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2580         if (ret != 0) {
2581                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2582                 talloc_free(tmp_ctx);
2583                 return ret;
2584         }
2585         
2586         for (i=0;i<ips->num;i++) {
2587                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2588                         break;
2589                 }
2590         }
2591
2592         if (i==ips->num) {
2593                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2594                         ctdb_addr_to_str(&addr)));
2595                 talloc_free(tmp_ctx);
2596                 return -1;
2597         }
2598
2599         /* This is an optimisation.  If this node is hosting the IP
2600          * then try to move it somewhere else without invoking a full
2601          * takeover run.  We don't care if this doesn't work!
2602          */
2603         if (ips->ips[i].pnn == options.pnn) {
2604                 (void) try_moveip(ctdb, &addr, -1);
2605         }
2606
2607         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2608         if (ret != 0) {
2609                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2610                 talloc_free(tmp_ctx);
2611                 return ret;
2612         }
2613
2614         talloc_free(tmp_ctx);
2615         return 0;
2616 }
2617
2618 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2619                               int argc, const char **argv)
2620 {
2621         struct ctdb_control_killtcp *killtcp;
2622         int max_entries, current, i;
2623         struct timeval timeout;
2624         char line[128], src[128], dst[128];
2625         int linenum;
2626         TDB_DATA data;
2627         struct client_async_data *async_data;
2628         struct ctdb_client_control_state *state;
2629
2630         if (argc != 0) {
2631                 usage();
2632         }
2633
2634         linenum = 1;
2635         killtcp = NULL;
2636         max_entries = 0;
2637         current = 0;
2638         while (!feof(stdin)) {
2639                 if (fgets(line, sizeof(line), stdin) == NULL) {
2640                         continue;
2641                 }
2642
2643                 /* Silently skip empty lines */
2644                 if (line[0] == '\n') {
2645                         continue;
2646                 }
2647
2648                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2649                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2650                                           linenum, line));
2651                         talloc_free(killtcp);
2652                         return -1;
2653                 }
2654
2655                 if (current >= max_entries) {
2656                         max_entries += 1024;
2657                         killtcp = talloc_realloc(ctdb, killtcp,
2658                                                  struct ctdb_control_killtcp,
2659                                                  max_entries);
2660                         CTDB_NO_MEMORY(ctdb, killtcp);
2661                 }
2662
2663                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2664                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2665                                           linenum, src));
2666                         talloc_free(killtcp);
2667                         return -1;
2668                 }
2669
2670                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2671                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2672                                           linenum, dst));
2673                         talloc_free(killtcp);
2674                         return -1;
2675                 }
2676
2677                 current++;
2678         }
2679
2680         async_data = talloc_zero(ctdb, struct client_async_data);
2681         if (async_data == NULL) {
2682                 talloc_free(killtcp);
2683                 return -1;
2684         }
2685
2686         for (i = 0; i < current; i++) {
2687
2688                 data.dsize = sizeof(struct ctdb_control_killtcp);
2689                 data.dptr  = (unsigned char *)&killtcp[i];
2690
2691                 timeout = TIMELIMIT();
2692                 state = ctdb_control_send(ctdb, options.pnn, 0,
2693                                           CTDB_CONTROL_KILL_TCP, 0, data,
2694                                           async_data, &timeout, NULL);
2695
2696                 if (state == NULL) {
2697                         DEBUG(DEBUG_ERR,
2698                               ("Failed to call async killtcp control to node %u\n",
2699                                options.pnn));
2700                         talloc_free(killtcp);
2701                         return -1;
2702                 }
2703                 
2704                 ctdb_client_async_add(async_data, state);
2705         }
2706
2707         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2708                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2709                 talloc_free(killtcp);
2710                 return -1;
2711         }
2712
2713         talloc_free(killtcp);
2714         return 0;
2715 }
2716
2717
2718 /*
2719   kill a tcp connection
2720  */
2721 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2722 {
2723         int ret;
2724         struct ctdb_control_killtcp killtcp;
2725
2726         assert_single_node_only();
2727
2728         if (argc == 0) {
2729                 return kill_tcp_from_file(ctdb, argc, argv);
2730         }
2731
2732         if (argc < 2) {
2733                 usage();
2734         }
2735
2736         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2737                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2738                 return -1;
2739         }
2740
2741         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2742                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2743                 return -1;
2744         }
2745
2746         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2747         if (ret != 0) {
2748                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2749                 return ret;
2750         }
2751
2752         return 0;
2753 }
2754
2755
2756 /*
2757   send a gratious arp
2758  */
2759 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2760 {
2761         int ret;
2762         ctdb_sock_addr addr;
2763
2764         assert_single_node_only();
2765
2766         if (argc < 2) {
2767                 usage();
2768         }
2769
2770         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2771                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2772                 return -1;
2773         }
2774
2775         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2776         if (ret != 0) {
2777                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2778                 return ret;
2779         }
2780
2781         return 0;
2782 }
2783
2784 /*
2785   register a server id
2786  */
2787 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2788 {
2789         int ret;
2790         struct ctdb_server_id server_id;
2791
2792         if (argc < 3) {
2793                 usage();
2794         }
2795
2796         server_id.pnn       = strtoul(argv[0], NULL, 0);
2797         server_id.type      = strtoul(argv[1], NULL, 0);
2798         server_id.server_id = strtoul(argv[2], NULL, 0);
2799
2800         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2801         if (ret != 0) {
2802                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2803                 return ret;
2804         }
2805         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2806         sleep(999);
2807         return -1;
2808 }
2809
2810 /*
2811   unregister a server id
2812  */
2813 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2814 {
2815         int ret;
2816         struct ctdb_server_id server_id;
2817
2818         if (argc < 3) {
2819                 usage();
2820         }
2821
2822         server_id.pnn       = strtoul(argv[0], NULL, 0);
2823         server_id.type      = strtoul(argv[1], NULL, 0);
2824         server_id.server_id = strtoul(argv[2], NULL, 0);
2825
2826         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2827         if (ret != 0) {
2828                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2829                 return ret;
2830         }
2831         return -1;
2832 }
2833
2834 /*
2835   check if a server id exists
2836  */
2837 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2838 {
2839         uint32_t status;
2840         int ret;
2841         struct ctdb_server_id server_id;
2842
2843         if (argc < 3) {
2844                 usage();
2845         }
2846
2847         server_id.pnn       = strtoul(argv[0], NULL, 0);
2848         server_id.type      = strtoul(argv[1], NULL, 0);
2849         server_id.server_id = strtoul(argv[2], NULL, 0);
2850
2851         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2852         if (ret != 0) {
2853                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2854                 return ret;
2855         }
2856
2857         if (status) {
2858                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2859         } else {
2860                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2861         }
2862         return 0;
2863 }
2864
2865 /*
2866   get a list of all server ids that are registered on a node
2867  */
2868 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2869 {
2870         int i, ret;
2871         struct ctdb_server_id_list *server_ids;
2872
2873         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2874         if (ret != 0) {
2875                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2876                 return ret;
2877         }
2878
2879         for (i=0; i<server_ids->num; i++) {
2880                 printf("Server id %d:%d:%d\n", 
2881                         server_ids->server_ids[i].pnn, 
2882                         server_ids->server_ids[i].type, 
2883                         server_ids->server_ids[i].server_id); 
2884         }
2885
2886         return -1;
2887 }
2888
2889 /*
2890   check if a server id exists
2891  */
2892 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2893 {
2894         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2895         uint64_t *ids;
2896         uint8_t *result;
2897         int i;
2898
2899         if (argc < 1) {
2900                 talloc_free(tmp_ctx);
2901                 usage();
2902         }
2903
2904         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2905         result = talloc_array(tmp_ctx, uint8_t, argc);
2906
2907         for (i = 0; i < argc; i++) {
2908                 ids[i] = strtoull(argv[i], NULL, 0);
2909         }
2910
2911         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2912                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2913                                   options.pnn));
2914                 talloc_free(tmp_ctx);
2915                 return -1;
2916         }
2917
2918         for (i=0; i < argc; i++) {
2919                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2920                        result[i] ? "exists" : "does not exist");
2921         }
2922
2923         talloc_free(tmp_ctx);
2924         return 0;
2925 }
2926
2927 /*
2928   send a tcp tickle ack
2929  */
2930 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2931 {
2932         int ret;
2933         ctdb_sock_addr  src, dst;
2934
2935         if (argc < 2) {
2936                 usage();
2937         }
2938
2939         if (!parse_ip_port(argv[0], &src)) {
2940                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2941                 return -1;
2942         }
2943
2944         if (!parse_ip_port(argv[1], &dst)) {
2945                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2946                 return -1;
2947         }
2948
2949         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2950         if (ret==0) {
2951                 return 0;
2952         }
2953         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2954
2955         return -1;
2956 }
2957
2958
2959 /*
2960   display public ip status
2961  */
2962 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2963 {
2964         int i, ret;
2965         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2966         struct ctdb_all_public_ips *ips;
2967
2968         if (options.pnn == CTDB_BROADCAST_ALL) {
2969                 /* read the list of public ips from all nodes */
2970                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2971         } else {
2972                 /* read the public ip list from this node */
2973                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2974         }
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2977                 talloc_free(tmp_ctx);
2978                 return ret;
2979         }
2980
2981         if (options.machinereadable){
2982                 printm(":Public IP:Node:");
2983                 if (options.verbose){
2984                         printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2985                 }
2986                 printm("\n");
2987         } else {
2988                 if (options.pnn == CTDB_BROADCAST_ALL) {
2989                         printf("Public IPs on ALL nodes\n");
2990                 } else {
2991                         printf("Public IPs on node %u\n", options.pnn);
2992                 }
2993         }
2994
2995         for (i=1;i<=ips->num;i++) {
2996                 struct ctdb_control_public_ip_info *info = NULL;
2997                 int32_t pnn;
2998                 char *aciface = NULL;
2999                 char *avifaces = NULL;
3000                 char *cifaces = NULL;
3001
3002                 if (options.pnn == CTDB_BROADCAST_ALL) {
3003                         pnn = ips->ips[ips->num-i].pnn;
3004                 } else {
3005                         pnn = options.pnn;
3006                 }
3007
3008                 if (pnn != -1) {
3009                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
3010                                                    &ips->ips[ips->num-i].addr, &info);
3011                 } else {
3012                         ret = -1;
3013                 }
3014
3015                 if (ret == 0) {
3016                         int j;
3017                         for (j=0; j < info->num; j++) {
3018                                 if (cifaces == NULL) {
3019                                         cifaces = talloc_strdup(info,
3020                                                                 info->ifaces[j].name);
3021                                 } else {
3022                                         cifaces = talloc_asprintf_append(cifaces,
3023                                                                          ",%s",
3024                                                                          info->ifaces[j].name);
3025                                 }
3026
3027                                 if (info->active_idx == j) {
3028                                         aciface = info->ifaces[j].name;
3029                                 }
3030
3031                                 if (info->ifaces[j].link_state == 0) {
3032                                         continue;
3033                                 }
3034
3035                                 if (avifaces == NULL) {
3036                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
3037                                 } else {
3038                                         avifaces = talloc_asprintf_append(avifaces,
3039                                                                           ",%s",
3040                                                                           info->ifaces[j].name);
3041                                 }
3042                         }
3043                 }
3044
3045                 if (options.machinereadable){
3046                         printm(":%s:%d:",
3047                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3048                                 ips->ips[ips->num-i].pnn);
3049                         if (options.verbose){
3050                                 printm("%s:%s:%s:",
3051                                         aciface?aciface:"",
3052                                         avifaces?avifaces:"",
3053                                         cifaces?cifaces:"");
3054                         }
3055                         printf("\n");
3056                 } else {
3057                         if (options.verbose) {
3058                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
3059                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3060                                         ips->ips[ips->num-i].pnn,
3061                                         aciface?aciface:"",
3062                                         avifaces?avifaces:"",
3063                                         cifaces?cifaces:"");
3064                         } else {
3065                                 printf("%s %d\n",
3066                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3067                                         ips->ips[ips->num-i].pnn);
3068                         }
3069                 }
3070                 talloc_free(info);
3071         }
3072
3073         talloc_free(tmp_ctx);
3074         return 0;
3075 }
3076
3077 /*
3078   public ip info
3079  */
3080 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
3081 {
3082         int i, ret;
3083         ctdb_sock_addr addr;
3084         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3085         struct ctdb_control_public_ip_info *info;
3086
3087         if (argc != 1) {
3088                 talloc_free(tmp_ctx);
3089                 usage();
3090         }
3091
3092         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
3093                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
3094                 return -1;
3095         }
3096
3097         /* read the public ip info from this node */
3098         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
3099                                            tmp_ctx, &addr, &info);
3100         if (ret != 0) {
3101                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
3102                                   argv[0], options.pnn));
3103                 talloc_free(tmp_ctx);
3104                 return ret;
3105         }
3106
3107         printf("Public IP[%s] info on node %u\n",
3108                ctdb_addr_to_str(&info->ip.addr),
3109                options.pnn);
3110
3111         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
3112                ctdb_addr_to_str(&info->ip.addr),
3113                info->ip.pnn, info->num);
3114
3115         for (i=0; i<info->num; i++) {
3116                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
3117
3118                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
3119                        i+1, info->ifaces[i].name,
3120                        info->ifaces[i].link_state?"up":"down",
3121                        (unsigned int)info->ifaces[i].references,
3122                        (i==info->active_idx)?" (active)":"");
3123         }
3124
3125         talloc_free(tmp_ctx);
3126         return 0;
3127 }
3128
3129 /*
3130   display interfaces status
3131  */
3132 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3133 {
3134         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3135         int i;
3136         struct ctdb_control_get_ifaces *ifaces;
3137         int ret;
3138
3139         /* read the public ip list from this node */
3140         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3141         if (ret != 0) {
3142                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3143                                   options.pnn));
3144                 talloc_free(tmp_ctx);
3145                 return -1;
3146         }
3147
3148         if (options.machinereadable){
3149                 printm(":Name:LinkStatus:References:\n");
3150         } else {
3151                 printf("Interfaces on node %u\n", options.pnn);
3152         }
3153
3154         for (i=0; i<ifaces->num; i++) {
3155                 if (options.machinereadable){
3156                         printm(":%s:%s:%u\n",
3157                                ifaces->ifaces[i].name,
3158                                ifaces->ifaces[i].link_state?"1":"0",
3159                                (unsigned int)ifaces->ifaces[i].references);
3160                 } else {
3161                         printf("name:%s link:%s references:%u\n",
3162                                ifaces->ifaces[i].name,
3163                                ifaces->ifaces[i].link_state?"up":"down",
3164                                (unsigned int)ifaces->ifaces[i].references);
3165                 }
3166         }
3167
3168         talloc_free(tmp_ctx);
3169         return 0;
3170 }
3171
3172
3173 /*
3174   set link status of an interface
3175  */
3176 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3177 {
3178         int ret;
3179         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3180         struct ctdb_control_iface_info info;
3181
3182         ZERO_STRUCT(info);
3183
3184         if (argc != 2) {
3185                 usage();
3186         }
3187
3188         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3189                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3190                                   argv[0]));
3191                 talloc_free(tmp_ctx);
3192                 return -1;
3193         }
3194         strcpy(info.name, argv[0]);
3195
3196         if (strcmp(argv[1], "up") == 0) {
3197                 info.link_state = 1;
3198         } else if (strcmp(argv[1], "down") == 0) {
3199                 info.link_state = 0;
3200         } else {
3201                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3202                                   argv[1]));
3203                 talloc_free(tmp_ctx);
3204                 return -1;
3205         }
3206
3207         /* read the public ip list from this node */
3208         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3209                                    tmp_ctx, &info);
3210         if (ret != 0) {
3211                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3212                                   argv[0], options.pnn));
3213                 talloc_free(tmp_ctx);
3214                 return ret;
3215         }
3216
3217         talloc_free(tmp_ctx);
3218         return 0;
3219 }
3220
3221 /*
3222   display pid of a ctdb daemon
3223  */
3224 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3225 {
3226         uint32_t pid;
3227         int ret;
3228
3229         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3230         if (ret != 0) {
3231                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3232                 return ret;
3233         }
3234         printf("Pid:%d\n", pid);
3235
3236         return 0;
3237 }
3238
3239 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3240
3241 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3242                                               void *data,
3243                                               update_flags_handler_t handler,
3244                                               uint32_t flag,
3245                                               const char *desc,
3246                                               bool set_flag)
3247 {
3248         struct ctdb_node_map *nodemap = NULL;
3249         bool flag_is_set;
3250         int ret;
3251
3252         /* Check if the node is already in the desired state */
3253         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3254         if (ret != 0) {
3255                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3256                 exit(10);
3257         }
3258         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3259         if (set_flag == flag_is_set) {
3260                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3261                                      (set_flag ? "already" : "not"), desc));
3262                 return 0;
3263         }
3264
3265         do {
3266                 if (!handler(ctdb, data)) {
3267                         DEBUG(DEBUG_WARNING,
3268                               ("Failed to send control to set state %s on node %u, try again\n",
3269                                desc, options.pnn));
3270                 }
3271
3272                 sleep(1);
3273
3274                 /* Read the nodemap and verify the change took effect.
3275                  * Even if the above control/hanlder timed out then it
3276                  * could still have worked!
3277                  */
3278                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3279                                          ctdb, &nodemap);
3280                 if (ret != 0) {
3281                         DEBUG(DEBUG_WARNING,
3282                               ("Unable to get nodemap from local node, try again\n"));
3283                 }
3284                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3285         } while (nodemap == NULL || (set_flag != flag_is_set));
3286
3287         return ipreallocate(ctdb);
3288 }
3289
3290 /* Administratively disable a node */
3291 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3292 {
3293         int ret;
3294
3295         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3296                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3297         return ret == 0;
3298 }
3299
3300 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3301 {
3302         return update_flags_and_ipreallocate(ctdb, NULL,
3303                                                   update_flags_disabled,
3304                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3305                                                   "disabled",
3306                                                   true /* set_flag*/);
3307 }
3308
3309 /* Administratively re-enable a node */
3310 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3311 {
3312         int ret;
3313
3314         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3315                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3316         return ret == 0;
3317 }
3318
3319 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3320 {
3321         return update_flags_and_ipreallocate(ctdb, NULL,
3322                                                   update_flags_not_disabled,
3323                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3324                                                   "disabled",
3325                                                   false /* set_flag*/);
3326 }
3327
3328 /* Stop a node */
3329 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3330 {
3331         int ret;
3332
3333         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3334
3335         return ret == 0;
3336 }
3337
3338 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3339 {
3340         return update_flags_and_ipreallocate(ctdb, NULL,
3341                                                   update_flags_stopped,
3342                                                   NODE_FLAGS_STOPPED,
3343                                                   "stopped",
3344                                                   true /* set_flag*/);
3345 }
3346
3347 /* Continue a stopped node */
3348 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3349 {
3350         int ret;
3351
3352         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3353
3354         return ret == 0;
3355 }
3356
3357 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3358 {
3359         return update_flags_and_ipreallocate(ctdb, NULL,
3360                                                   update_flags_not_stopped,
3361                                                   NODE_FLAGS_STOPPED,
3362                                                   "stopped",
3363                                                   false /* set_flag */);
3364 }
3365
3366 static uint32_t get_generation(struct ctdb_context *ctdb)
3367 {
3368         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3369         struct ctdb_vnn_map *vnnmap=NULL;
3370         int ret;
3371         uint32_t generation;
3372
3373         /* wait until the recmaster is not in recovery mode */
3374         while (1) {
3375                 uint32_t recmode, recmaster;
3376                 
3377                 if (vnnmap != NULL) {
3378                         talloc_free(vnnmap);
3379                         vnnmap = NULL;
3380                 }
3381
3382                 /* get the recmaster */
3383                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3384                 if (ret != 0) {
3385                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3386                         talloc_free(tmp_ctx);
3387                         exit(10);
3388                 }
3389
3390                 /* get recovery mode */
3391                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3392                 if (ret != 0) {
3393                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3394                         talloc_free(tmp_ctx);
3395                         exit(10);
3396                 }
3397
3398                 /* get the current generation number */
3399                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3400                 if (ret != 0) {
3401                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3402                         talloc_free(tmp_ctx);
3403                         exit(10);
3404                 }
3405
3406                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3407                         generation = vnnmap->generation;
3408                         talloc_free(tmp_ctx);
3409                         return generation;
3410                 }
3411                 sleep(1);
3412         }
3413 }
3414
3415 /* Ban a node */
3416 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3417 {
3418         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3419         int ret;
3420
3421         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3422
3423         return ret == 0;
3424 }
3425
3426 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3427 {
3428         struct ctdb_ban_time bantime;
3429
3430         if (argc < 1) {
3431                 usage();
3432         }
3433         
3434         bantime.pnn  = options.pnn;
3435         bantime.time = strtoul(argv[0], NULL, 0);
3436
3437         if (bantime.time == 0) {
3438                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3439                 return -1;
3440         }
3441
3442         return update_flags_and_ipreallocate(ctdb, &bantime,
3443                                                   update_state_banned,
3444                                                   NODE_FLAGS_BANNED,
3445                                                   "banned",
3446                                                   true /* set_flag*/);
3447 }
3448
3449
3450 /* Unban a node */
3451 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3452 {
3453         struct ctdb_ban_time bantime;
3454
3455         bantime.pnn  = options.pnn;
3456         bantime.time = 0;
3457
3458         return update_flags_and_ipreallocate(ctdb, &bantime,
3459                                                   update_state_banned,
3460                                                   NODE_FLAGS_BANNED,
3461                                                   "banned",
3462                                                   false /* set_flag*/);
3463 }
3464
3465 /*
3466   show ban information for a node
3467  */
3468 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3469 {
3470         int ret;
3471         struct ctdb_node_map *nodemap=NULL;
3472         struct ctdb_ban_time *bantime;
3473
3474         /* verify the node exists */
3475         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3476         if (ret != 0) {
3477                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3478                 return ret;
3479         }
3480
3481         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3482         if (ret != 0) {
3483                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3484                 return -1;
3485         }       
3486
3487         if (bantime->time == 0) {
3488                 printf("Node %u is not banned\n", bantime->pnn);
3489         } else {
3490                 printf("Node %u is banned, %d seconds remaining\n",
3491                        bantime->pnn, bantime->time);
3492         }
3493
3494         return 0;
3495 }
3496
3497 /*
3498   shutdown a daemon
3499  */
3500 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3501 {
3502         int ret;
3503
3504         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3505         if (ret != 0) {
3506                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3507                 return ret;
3508         }
3509
3510         return 0;
3511 }
3512
3513 /*
3514   trigger a recovery
3515  */
3516 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3517 {
3518         int ret;
3519         uint32_t generation, next_generation;
3520
3521         /* record the current generation number */
3522         generation = get_generation(ctdb);
3523
3524         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3525         if (ret != 0) {
3526                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3527                 return ret;
3528         }
3529
3530         /* wait until we are in a new generation */
3531         while (1) {
3532                 next_generation = get_generation(ctdb);
3533                 if (next_generation != generation) {
3534                         return 0;
3535                 }
3536                 sleep(1);
3537         }
3538
3539         return 0;
3540 }
3541
3542
3543 /*
3544   display monitoring mode of a remote node
3545  */
3546 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3547 {
3548         uint32_t monmode;
3549         int ret;
3550
3551         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3552         if (ret != 0) {
3553                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3554                 return ret;
3555         }
3556         if (!options.machinereadable){
3557                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3558         } else {
3559                 printm(":mode:\n");
3560                 printm(":%d:\n",monmode);
3561         }
3562         return 0;
3563 }
3564
3565
3566 /*
3567   display capabilities of a remote node
3568  */
3569 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3570 {
3571         uint32_t capabilities;
3572         int ret;
3573
3574         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3575         if (ret != 0) {
3576                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3577                 return -1;
3578         }
3579         
3580         if (!options.machinereadable){
3581                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3582                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3583                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3584                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3585         } else {
3586                 printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
3587                 printm(":%d:%d:%d:%d:\n",
3588                         !!(capabilities&CTDB_CAP_RECMASTER),
3589                         !!(capabilities&CTDB_CAP_LMASTER),
3590                         !!(capabilities&CTDB_CAP_LVS),
3591                         !!(capabilities&CTDB_CAP_NATGW));
3592         }
3593         return 0;
3594 }
3595
3596 /*
3597   display lvs configuration
3598  */
3599
3600 static uint32_t lvs_exclude_flags[] = {
3601         /* Look for a nice healthy node */
3602         NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
3603         /* If not found, an UNHEALTHY node will do */
3604         NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
3605         0,
3606 };
3607
3608 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3609 {
3610         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3611         struct ctdb_node_map *orig_nodemap=NULL;
3612         struct ctdb_node_map *nodemap;
3613         int i, ret;
3614
3615         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3616                                    tmp_ctx, &orig_nodemap);
3617         if (ret != 0) {
3618                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3619                 talloc_free(tmp_ctx);
3620                 return -1;
3621         }
3622
3623         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3624                                                  CTDB_CAP_LVS, false);
3625         if (nodemap == NULL) {
3626                 /* No memory */
3627                 ret = -1;
3628                 goto done;
3629         }
3630
3631         ret = 0;
3632
3633         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3634                 struct ctdb_node_map *t =
3635                         filter_nodemap_by_flags(ctdb, nodemap,
3636                                                 lvs_exclude_flags[i]);
3637                 if (t == NULL) {
3638                         /* No memory */
3639                         ret = -1;
3640                         goto done;
3641                 }
3642                 if (t->num > 0) {
3643                         /* At least 1 node without excluded flags */
3644                         int j;
3645                         for (j = 0; j < t->num; j++) {
3646                                 printf("%d:%s\n", t->nodes[j].pnn, 
3647                                        ctdb_addr_to_str(&t->nodes[j].addr));
3648                         }
3649                         goto done;
3650                 }
3651                 talloc_free(t);
3652         }
3653 done:
3654         talloc_free(tmp_ctx);
3655         return ret;
3656 }
3657
3658 /*
3659   display who is the lvs master
3660  */
3661 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3662 {
3663         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3664         struct ctdb_node_map *nodemap=NULL;
3665         int i, ret;
3666
3667         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3668                                    tmp_ctx, &nodemap);
3669         if (ret != 0) {
3670                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3671                 talloc_free(tmp_ctx);
3672                 return -1;
3673         }
3674
3675         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3676                 struct ctdb_node_map *t =
3677                         filter_nodemap_by_flags(ctdb, nodemap,
3678                                                 lvs_exclude_flags[i]);
3679                 if (t == NULL) {
3680                         /* No memory */
3681                         ret = -1;
3682                         goto done;
3683                 }
3684                 if (t->num > 0) {
3685                         struct ctdb_node_map *n;
3686                         n = filter_nodemap_by_capabilities(ctdb,
3687                                                            t,
3688                                                            CTDB_CAP_LVS,
3689                                                            true);
3690                         if (n == NULL) {
3691                                 /* No memory */
3692                                 ret = -1;
3693                                 goto done;
3694                         }
3695                         if (n->num > 0) {
3696                                 ret = 0;
3697                                 if (options.machinereadable) {
3698                                         printm("%d\n", n->nodes[0].pnn);
3699                                 } else {
3700                                         printf("Node %d is LVS master\n", n->nodes[0].pnn);
3701                                 }
3702                                 goto done;
3703                         }
3704                 }
3705                 talloc_free(t);
3706         }
3707
3708         printf("There is no LVS master\n");
3709         ret = 255;
3710 done:
3711         talloc_free(tmp_ctx);
3712         return ret;
3713 }
3714
3715 /*
3716   disable monitoring on a  node
3717  */
3718 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3719 {
3720         
3721         int ret;
3722
3723         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3724         if (ret != 0) {
3725                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3726                 return ret;
3727         }
3728         printf("Monitoring mode:%s\n","DISABLED");
3729
3730         return 0;
3731 }
3732
3733 /*
3734   enable monitoring on a  node
3735  */
3736 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3737 {
3738         
3739         int ret;
3740
3741         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3742         if (ret != 0) {
3743                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3744                 return ret;
3745         }
3746         printf("Monitoring mode:%s\n","ACTIVE");
3747
3748         return 0;
3749 }
3750
3751 /*
3752   display remote list of keys/data for a db
3753  */
3754 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3755 {
3756         const char *db_name;
3757         struct ctdb_db_context *ctdb_db;
3758         int ret;
3759         struct ctdb_dump_db_context c;
3760         uint8_t flags;
3761
3762         if (argc < 1) {
3763                 usage();
3764         }
3765
3766         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3767                 return -1;
3768         }
3769
3770         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3771         if (ctdb_db == NULL) {
3772                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3773                 return -1;
3774         }
3775
3776         if (options.printlmaster) {
3777                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3778                                           ctdb, &ctdb->vnn_map);
3779                 if (ret != 0) {
3780                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3781                                           options.pnn));
3782                         return ret;
3783                 }
3784         }
3785
3786         ZERO_STRUCT(c);
3787         c.f = stdout;
3788         c.printemptyrecords = (bool)options.printemptyrecords;
3789         c.printdatasize = (bool)options.printdatasize;
3790         c.printlmaster = (bool)options.printlmaster;
3791         c.printhash = (bool)options.printhash;
3792         c.printrecordflags = (bool)options.printrecordflags;
3793
3794         /* traverse and dump the cluster tdb */
3795         ret = ctdb_dump_db(ctdb_db, &c);
3796         if (ret == -1) {
3797                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3798                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3799                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3800                                   db_name));
3801                 return -1;
3802         }
3803         talloc_free(ctdb_db);
3804
3805         printf("Dumped %d records\n", ret);
3806         return 0;
3807 }
3808
3809 struct cattdb_data {
3810         struct ctdb_context *ctdb;
3811         uint32_t count;
3812 };
3813
3814 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3815 {
3816         struct cattdb_data *d = private_data;
3817         struct ctdb_dump_db_context c;
3818
3819         d->count++;
3820
3821         ZERO_STRUCT(c);
3822         c.f = stdout;
3823         c.printemptyrecords = (bool)options.printemptyrecords;
3824         c.printdatasize = (bool)options.printdatasize;
3825         c.printlmaster = false;
3826         c.printhash = (bool)options.printhash;
3827         c.printrecordflags = true;
3828
3829         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3830 }
3831
3832 /*
3833   cat the local tdb database using same format as catdb
3834  */
3835 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3836 {
3837         const char *db_name;
3838         struct ctdb_db_context *ctdb_db;
3839         struct cattdb_data d;
3840         uint8_t flags;
3841
3842         if (argc < 1) {
3843                 usage();
3844         }
3845
3846         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3847                 return -1;
3848         }
3849
3850         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3851         if (ctdb_db == NULL) {
3852                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3853                 return -1;
3854         }
3855
3856         /* traverse the local tdb */
3857         d.count = 0;
3858         d.ctdb  = ctdb;
3859         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3860                 printf("Failed to cattdb data\n");
3861                 exit(10);
3862         }
3863         talloc_free(ctdb_db);
3864
3865         printf("Dumped %d records\n", d.count);
3866         return 0;
3867 }
3868
3869 /*
3870   display the content of a database key
3871  */
3872 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3873 {
3874         const char *db_name;
3875         struct ctdb_db_context *ctdb_db;
3876         struct ctdb_record_handle *h;
3877         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3878         TDB_DATA key, data;
3879         uint8_t flags;
3880
3881         if (argc < 2) {
3882                 usage();
3883         }
3884
3885         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3886                 return -1;
3887         }
3888
3889         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3890         if (ctdb_db == NULL) {
3891                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3892                 return -1;
3893         }
3894
3895         key.dptr  = discard_const(argv[1]);
3896         key.dsize = strlen((char *)key.dptr);
3897
3898         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3899         if (h == NULL) {
3900                 printf("Failed to fetch record '%s' on node %d\n", 
3901                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3902                 talloc_free(tmp_ctx);
3903                 exit(10);
3904         }
3905
3906         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3907
3908         talloc_free(tmp_ctx);
3909         talloc_free(ctdb_db);
3910         return 0;
3911 }
3912
3913 /*
3914   display the content of a database key
3915  */
3916 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3917 {
3918         const char *db_name;
3919         struct ctdb_db_context *ctdb_db;
3920         struct ctdb_record_handle *h;
3921         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3922         TDB_DATA key, data;
3923         uint8_t flags;
3924
3925         if (argc < 3) {
3926                 usage();
3927         }
3928
3929         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3930                 return -1;
3931         }
3932
3933         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3934         if (ctdb_db == NULL) {
3935                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3936                 return -1;
3937         }
3938
3939         key.dptr  = discard_const(argv[1]);
3940         key.dsize = strlen((char *)key.dptr);
3941
3942         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3943         if (h == NULL) {
3944                 printf("Failed to fetch record '%s' on node %d\n", 
3945                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3946                 talloc_free(tmp_ctx);
3947                 exit(10);
3948         }
3949
3950         data.dptr  = discard_const(argv[2]);
3951         data.dsize = strlen((char *)data.dptr);
3952
3953         if (ctdb_record_store(h, data) != 0) {
3954                 printf("Failed to store record\n");
3955         }
3956
3957         talloc_free(h);
3958         talloc_free(tmp_ctx);
3959         talloc_free(ctdb_db);
3960         return 0;
3961 }
3962
3963 /*
3964   fetch a record from a persistent database
3965  */
3966 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3967 {
3968         const char *db_name;
3969         struct ctdb_db_context *ctdb_db;
3970         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3971         struct ctdb_transaction_handle *h;
3972         TDB_DATA key, data;
3973         int fd, ret;
3974         bool persistent;
3975         uint8_t flags;
3976
3977         if (argc < 2) {
3978                 talloc_free(tmp_ctx);
3979                 usage();
3980         }
3981
3982         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3983                 talloc_free(tmp_ctx);
3984                 return -1;
3985         }
3986
3987         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3988         if (!persistent) {
3989                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3990                 talloc_free(tmp_ctx);
3991                 return -1;
3992         }
3993
3994         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3995         if (ctdb_db == NULL) {
3996                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3997                 talloc_free(tmp_ctx);
3998                 return -1;
3999         }
4000
4001         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4002         if (h == NULL) {
4003                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4004                 talloc_free(tmp_ctx);
4005                 return -1;
4006         }
4007
4008         key.dptr  = discard_const(argv[1]);
4009         key.dsize = strlen(argv[1]);
4010         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
4011         if (ret != 0) {
4012                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
4013                 talloc_free(tmp_ctx);
4014                 return -1;
4015         }
4016
4017         if (data.dsize == 0 || data.dptr == NULL) {
4018                 DEBUG(DEBUG_ERR,("Record is empty\n"));
4019                 talloc_free(tmp_ctx);
4020                 return -1;
4021         }
4022
4023         if (argc == 3) {
4024           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4025                 if (fd == -1) {
4026                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
4027                         talloc_free(tmp_ctx);
4028                         return -1;
4029                 }
4030                 sys_write(fd, data.dptr, data.dsize);
4031                 close(fd);
4032         } else {
4033                 sys_write(1, data.dptr, data.dsize);
4034         }
4035
4036         /* abort the transaction */
4037         talloc_free(h);
4038
4039
4040         talloc_free(tmp_ctx);
4041         return 0;
4042 }
4043
4044 /*
4045   fetch a record from a tdb-file
4046  */
4047 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
4048 {
4049         const char *tdb_file;
4050         TDB_CONTEXT *tdb;
4051         TDB_DATA key, data;
4052         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4053         int fd;
4054
4055         if (argc < 2) {
4056                 usage();
4057         }
4058
4059         tdb_file = argv[0];
4060
4061         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
4062         if (tdb == NULL) {
4063                 printf("Failed to open TDB file %s\n", tdb_file);
4064                 return -1;
4065         }
4066
4067         if (!strncmp(argv[1], "0x", 2)) {
4068                 key = hextodata(tmp_ctx, argv[1] + 2);
4069                 if (key.dsize == 0) {
4070                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4071                         return -1;
4072                 }
4073         } else {
4074                 key.dptr  = discard_const(argv[1]);
4075                 key.dsize = strlen(argv[1]);
4076         }
4077
4078         data = tdb_fetch(tdb, key);
4079         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4080                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4081                 tdb_close(tdb);
4082                 return -1;
4083         }
4084
4085         tdb_close(tdb);
4086
4087         if (argc == 3) {
4088           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4089                 if (fd == -1) {
4090                         printf("Failed to open output file %s\n", argv[2]);
4091                         return -1;
4092                 }
4093                 if (options.verbose){
4094                         sys_write(fd, data.dptr, data.dsize);
4095                 } else {
4096                         sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4097                 }
4098                 close(fd);
4099         } else {
4100                 if (options.verbose){
4101                         sys_write(1, data.dptr, data.dsize);
4102                 } else {
4103                         sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4104                 }
4105         }
4106
4107         talloc_free(tmp_ctx);
4108         return 0;
4109 }
4110
4111 /*
4112   store a record and header to a tdb-file
4113  */
4114 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4115 {
4116         const char *tdb_file;
4117         TDB_CONTEXT *tdb;
4118         TDB_DATA key, value, data;
4119         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4120         struct ctdb_ltdb_header header;
4121
4122         if (argc < 3) {
4123                 usage();
4124         }
4125
4126         tdb_file = argv[0];
4127
4128         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4129         if (tdb == NULL) {
4130                 printf("Failed to open TDB file %s\n", tdb_file);
4131                 return -1;
4132         }
4133
4134         if (!strncmp(argv[1], "0x", 2)) {
4135                 key = hextodata(tmp_ctx, argv[1] + 2);
4136                 if (key.dsize == 0) {
4137                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4138                         return -1;
4139                 }
4140         } else {
4141                 key.dptr  = discard_const(argv[1]);
4142                 key.dsize = strlen(argv[1]);
4143         }
4144
4145         if (!strncmp(argv[2], "0x", 2)) {
4146                 value = hextodata(tmp_ctx, argv[2] + 2);
4147                 if (value.dsize == 0) {
4148                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4149                         return -1;
4150                 }
4151         } else {
4152                 value.dptr  = discard_const(argv[2]);
4153                 value.dsize = strlen(argv[2]);
4154         }
4155
4156         ZERO_STRUCT(header);
4157         if (argc > 3) {
4158                 header.rsn = atoll(argv[3]);
4159         }
4160         if (argc > 4) {
4161                 header.dmaster = atoi(argv[4]);
4162         }
4163         if (argc > 5) {
4164                 header.flags = atoi(argv[5]);
4165         }
4166
4167         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4168         data.dptr = talloc_size(tmp_ctx, data.dsize);
4169         if (data.dptr == NULL) {
4170                 printf("Failed to allocate header+value\n");
4171                 return -1;
4172         }
4173
4174         *(struct ctdb_ltdb_header *)data.dptr = header;
4175         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4176
4177         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4178                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4179                 tdb_close(tdb);
4180                 return -1;
4181         }
4182
4183         tdb_close(tdb);
4184
4185         talloc_free(tmp_ctx);
4186         return 0;
4187 }
4188
4189 /*
4190   write a record to a persistent database
4191  */
4192 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4193 {
4194         const char *db_name;
4195         struct ctdb_db_context *ctdb_db;
4196         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4197         struct ctdb_transaction_handle *h;
4198         struct stat st;
4199         TDB_DATA key, data;
4200         int fd, ret;
4201
4202         if (argc < 3) {
4203                 talloc_free(tmp_ctx);
4204                 usage();
4205         }
4206
4207         fd = open(argv[2], O_RDONLY);
4208         if (fd == -1) {
4209                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4210                 talloc_free(tmp_ctx);
4211                 return -1;
4212         }
4213         
4214         ret = fstat(fd, &st);
4215         if (ret == -1) {
4216                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4217                 close(fd);
4218                 talloc_free(tmp_ctx);
4219                 return -1;
4220         }
4221
4222         if (!S_ISREG(st.st_mode)) {
4223                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4224                 close(fd);
4225                 talloc_free(tmp_ctx);
4226                 return -1;
4227         }
4228
4229         data.dsize = st.st_size;
4230         if (data.dsize == 0) {
4231                 data.dptr  = NULL;
4232         } else {
4233                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4234                 if (data.dptr == NULL) {
4235                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4236                         close(fd);
4237                         talloc_free(tmp_ctx);
4238                         return -1;
4239                 }
4240                 ret = sys_read(fd, data.dptr, data.dsize);
4241                 if (ret != data.dsize) {
4242                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4243                         close(fd);
4244                         talloc_free(tmp_ctx);
4245                         return -1;
4246                 }
4247         }
4248         close(fd);
4249
4250
4251         db_name = argv[0];
4252
4253         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4254         if (ctdb_db == NULL) {
4255                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4256                 talloc_free(tmp_ctx);
4257                 return -1;
4258         }
4259
4260         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4261         if (h == NULL) {
4262                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4263                 talloc_free(tmp_ctx);
4264                 return -1;
4265         }
4266
4267         key.dptr  = discard_const(argv[1]);
4268         key.dsize = strlen(argv[1]);
4269         ret = ctdb_transaction_store(h, key, data);
4270         if (ret != 0) {
4271                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4272                 talloc_free(tmp_ctx);
4273                 return -1;
4274         }
4275
4276         ret = ctdb_transaction_commit(h);
4277         if (ret != 0) {
4278                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4279                 talloc_free(tmp_ctx);
4280                 return -1;
4281         }
4282
4283
4284         talloc_free(tmp_ctx);
4285         return 0;
4286 }
4287
4288 /*
4289  * delete a record from a persistent database
4290  */
4291 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4292 {
4293         const char *db_name;
4294         struct ctdb_db_context *ctdb_db;
4295         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4296         struct ctdb_transaction_handle *h;
4297         TDB_DATA key;
4298         int ret;
4299         bool persistent;
4300         uint8_t flags;
4301
4302         if (argc < 2) {
4303                 talloc_free(tmp_ctx);
4304                 usage();
4305         }
4306
4307         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4308                 talloc_free(tmp_ctx);
4309                 return -1;
4310         }
4311
4312         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4313         if (!persistent) {
4314                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4315                 talloc_free(tmp_ctx);
4316                 return -1;
4317         }
4318
4319         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4320         if (ctdb_db == NULL) {
4321                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4322                 talloc_free(tmp_ctx);
4323                 return -1;
4324         }
4325
4326         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4327         if (h == NULL) {
4328                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4329                 talloc_free(tmp_ctx);
4330                 return -1;
4331         }
4332
4333         key.dptr = discard_const(argv[1]);
4334         key.dsize = strlen(argv[1]);
4335         ret = ctdb_transaction_store(h, key, tdb_null);
4336         if (ret != 0) {
4337                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4338                 talloc_free(tmp_ctx);
4339                 return -1;
4340         }
4341
4342         ret = ctdb_transaction_commit(h);
4343         if (ret != 0) {
4344                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4345                 talloc_free(tmp_ctx);
4346                 return -1;
4347         }
4348
4349         talloc_free(tmp_ctx);
4350         return 0;
4351 }
4352
4353 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4354                                        TDB_DATA *data)
4355 {
4356         const char *t;
4357         size_t n;
4358         const char *ret; /* Next byte after successfully parsed value */
4359
4360         /* Error, unless someone says otherwise */
4361         ret = NULL;
4362         /* Indicates no value to parse */
4363         *data = tdb_null;
4364
4365         /* Skip whitespace */
4366         n = strspn(s, " \t");
4367         t = s + n;
4368
4369         if (t[0] == '"') {
4370                 /* Quoted ASCII string - no wide characters! */
4371                 t++;
4372                 n = strcspn(t, "\"");
4373                 if (t[n] == '"') {
4374                         if (n > 0) {
4375                                 data->dsize = n;
4376                                 data->dptr = talloc_memdup(mem_ctx, t, n);
4377                                 CTDB_NOMEM_ABORT(data->dptr);
4378                         }
4379                         ret = t + n + 1;
4380                 } else {
4381                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4382                 }
4383         } else {
4384                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4385         }
4386
4387         return ret;
4388 }
4389
4390 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4391                                  TDB_DATA *key, TDB_DATA *value)
4392 {
4393         char line [1024]; /* FIXME: make this more flexible? */
4394         const char *t;
4395         char *ptr;
4396
4397         ptr = fgets(line, sizeof(line), file);
4398
4399         if (ptr == NULL) {
4400                 return false;
4401         }
4402
4403         /* Get key */
4404         t = ptrans_parse_string(mem_ctx, line, key);
4405         if (t == NULL || key->dptr == NULL) {
4406                 /* Line Ignored but not EOF */
4407                 return true;
4408         }
4409
4410         /* Get value */
4411         t = ptrans_parse_string(mem_ctx, t, value);
4412         if (t == NULL) {
4413                 /* Line Ignored but not EOF */
4414                 talloc_free(key->dptr);
4415                 *key = tdb_null;
4416                 return true;
4417         }
4418
4419         return true;
4420 }
4421
4422 /*
4423  * Update a persistent database as per file/stdin
4424  */
4425 static int control_ptrans(struct ctdb_context *ctdb,
4426                           int argc, const char **argv)
4427 {
4428         const char *db_name;
4429         struct ctdb_db_context *ctdb_db;
4430         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4431         struct ctdb_transaction_handle *h;
4432         TDB_DATA key, value;
4433         FILE *file;
4434         int ret;
4435
4436         if (argc < 1) {
4437                 talloc_free(tmp_ctx);
4438                 usage();
4439         }
4440
4441         file = stdin;
4442         if (argc == 2) {
4443                 file = fopen(argv[1], "r");
4444                 if (file == NULL) {
4445                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4446                         talloc_free(tmp_ctx);
4447                         return -1;
4448                 }
4449         }
4450
4451         db_name = argv[0];
4452
4453         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4454         if (ctdb_db == NULL) {
4455                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4456                 goto error;
4457         }
4458
4459         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4460         if (h == NULL) {
4461                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4462                 goto error;
4463         }
4464
4465         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4466                 if (key.dsize != 0) {
4467                         ret = ctdb_transaction_store(h, key, value);
4468                         /* Minimise memory use */
4469                         talloc_free(key.dptr);
4470                         if (value.dptr != NULL) {
4471                                 talloc_free(value.dptr);
4472                         }
4473                         if (ret != 0) {
4474                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4475                                 ctdb_transaction_cancel(h);
4476                                 goto error;
4477                         }
4478                 }
4479         }
4480
4481         ret = ctdb_transaction_commit(h);
4482         if (ret != 0) {
4483                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4484                 goto error;
4485         }
4486
4487         if (file != stdin) {
4488                 fclose(file);
4489         }
4490         talloc_free(tmp_ctx);
4491         return 0;
4492
4493 error:
4494         if (file != stdin) {
4495                 fclose(file);
4496         }
4497
4498         talloc_free(tmp_ctx);
4499         return -1;
4500 }
4501
4502 /*
4503   check if a service is bound to a port or not
4504  */
4505 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4506 {
4507         int s, ret;
4508         int v;
4509         int port;
4510         struct sockaddr_in sin;
4511
4512         if (argc != 1) {
4513                 printf("Use: ctdb chktcport <port>\n");
4514                 return EINVAL;
4515         }
4516
4517         port = atoi(argv[0]);
4518
4519         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4520         if (s == -1) {
4521                 printf("Failed to open local socket\n");
4522                 return errno;
4523         }
4524
4525         v = fcntl(s, F_GETFL, 0);
4526         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4527                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4528         }
4529
4530         bzero(&sin, sizeof(sin));
4531         sin.sin_family = PF_INET;
4532         sin.sin_port   = htons(port);
4533         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4534         close(s);
4535         if (ret == -1) {
4536                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4537                 return errno;
4538         }
4539
4540         return 0;
4541 }
4542
4543
4544 /* Reload public IPs on a specified nodes */
4545 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4546 {
4547         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4548         uint32_t *nodes;
4549         uint32_t pnn_mode;
4550         uint32_t timeout;
4551         int ret;
4552
4553         assert_single_node_only();
4554
4555         if (argc > 1) {
4556                 usage();
4557         }
4558
4559         /* Determine the nodes where IPs need to be reloaded */
4560         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4561                               options.pnn, true, &nodes, &pnn_mode)) {
4562                 ret = -1;
4563                 goto done;
4564         }
4565
4566 again:
4567         /* Disable takeover runs on all connected nodes.  A reply
4568          * indicating success is needed from each node so all nodes
4569          * will need to be active.  This will retry until maxruntime
4570          * is exceeded, hence no error handling.
4571          * 
4572          * A check could be added to not allow reloading of IPs when
4573          * there are disconnected nodes.  However, this should
4574          * probably be left up to the administrator.
4575          */
4576         timeout = LONGTIMEOUT;
4577         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4578                         "Disable takeover runs", true);
4579
4580         /* Now tell all the desired nodes to reload their public IPs.
4581          * Keep trying this until it succeeds.  This assumes all
4582          * failures are transient, which might not be true...
4583          */
4584         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4585                                       nodes, 0, LONGTIMELIMIT(),
4586                                       false, tdb_null,
4587                                       NULL, NULL, NULL) != 0) {
4588                 DEBUG(DEBUG_ERR,
4589                       ("Unable to reload IPs on some nodes, try again.\n"));
4590                 goto again;
4591         }
4592
4593         /* It isn't strictly necessary to wait until takeover runs are
4594          * re-enabled but doing so can't hurt.
4595          */
4596         timeout = 0;
4597         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4598                         "Enable takeover runs", true);
4599
4600         ipreallocate(ctdb);
4601
4602         ret = 0;
4603 done:
4604         talloc_free(tmp_ctx);
4605         return ret;
4606 }
4607
4608 /*
4609   display a list of the databases on a remote ctdb
4610  */
4611 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4612 {
4613         int i, ret;
4614         struct ctdb_dbid_map *dbmap=NULL;
4615
4616         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4617         if (ret != 0) {
4618                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4619                 return ret;
4620         }
4621
4622         if(options.machinereadable){
4623                 printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4624                 for(i=0;i<dbmap->num;i++){
4625                         const char *path;
4626                         const char *name;
4627                         const char *health;
4628                         bool persistent;
4629                         bool readonly;
4630                         bool sticky;
4631
4632                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4633                                             dbmap->dbs[i].dbid, ctdb, &path);
4634                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4635                                             dbmap->dbs[i].dbid, ctdb, &name);
4636                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4637                                               dbmap->dbs[i].dbid, ctdb, &health);
4638                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4639                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4640                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4641                         printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4642                                dbmap->dbs[i].dbid, name, path,
4643                                !!(persistent), !!(sticky),
4644                                !!(health), !!(readonly));
4645                 }
4646                 return 0;
4647         }
4648
4649         printf("Number of databases:%d\n", dbmap->num);
4650         for(i=0;i<dbmap->num;i++){
4651                 const char *path;
4652                 const char *name;
4653                 const char *health;
4654                 bool persistent;
4655                 bool readonly;
4656                 bool sticky;
4657
4658                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4659                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4660                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4661                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4662                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4663                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4664                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4665                        dbmap->dbs[i].dbid, name, path,
4666                        persistent?" PERSISTENT":"",
4667                        sticky?" STICKY":"",
4668                        readonly?" READONLY":"",
4669                        health?" UNHEALTHY":"");
4670         }
4671
4672         return 0;
4673 }
4674
4675 /*
4676   display the status of a database on a remote ctdb
4677  */
4678 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4679 {
4680         const char *db_name;
4681         uint32_t db_id;
4682         uint8_t flags;
4683         const char *path;
4684         const char *health;
4685
4686         if (argc < 1) {
4687                 usage();
4688         }
4689
4690         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4691                 return -1;
4692         }
4693
4694         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4695         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4696         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4697                db_id, db_name, path,
4698                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4699                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4700                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4701                (health ? health : "OK"));
4702
4703         return 0;
4704 }
4705
4706 /*
4707   check if the local node is recmaster or not
4708   it will return 1 if this node is the recmaster and 0 if it is not
4709   or if the local ctdb daemon could not be contacted
4710  */
4711 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4712 {
4713         uint32_t mypnn, recmaster;
4714         int ret;
4715
4716         assert_single_node_only();
4717
4718         mypnn = getpnn(ctdb);
4719
4720         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4721         if (ret != 0) {
4722                 printf("Failed to get the recmaster\n");
4723                 return 1;
4724         }
4725
4726         if (recmaster != mypnn) {
4727                 printf("this node is not the recmaster\n");
4728                 return 1;
4729         }
4730
4731         printf("this node is the recmaster\n");
4732         return 0;
4733 }
4734
4735 /*
4736   ping a node
4737  */
4738 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4739 {
4740         int ret;
4741         struct timeval tv = timeval_current();
4742         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4743         if (ret == -1) {
4744                 printf("Unable to get ping response from node %u\n", options.pnn);
4745                 return -1;
4746         } else {
4747                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4748                        options.pnn, timeval_elapsed(&tv), ret);
4749         }
4750         return 0;
4751 }
4752
4753
4754 /*
4755   get a node's runstate
4756  */
4757 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4758 {
4759         int ret;
4760         enum ctdb_runstate runstate;
4761
4762         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4763         if (ret == -1) {
4764                 printf("Unable to get runstate response from node %u\n",
4765                        options.pnn);
4766                 return -1;
4767         } else {
4768                 bool found = true;
4769                 enum ctdb_runstate t;
4770                 int i;
4771                 for (i=0; i<argc; i++) {
4772                         found = false;
4773                         t = runstate_from_string(argv[i]);
4774                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4775                                 printf("Invalid run state (%s)\n", argv[i]);
4776                                 return -1;
4777                         }
4778
4779                         if (t == runstate) {
4780                                 found = true;
4781                                 break;
4782                         }
4783                 }
4784
4785                 if (!found) {
4786                         printf("CTDB not in required run state (got %s)\n", 
4787                                runstate_to_string((enum ctdb_runstate)runstate));
4788                         return -1;
4789                 }
4790         }
4791
4792         printf("%s\n", runstate_to_string(runstate));
4793         return 0;
4794 }
4795
4796
4797 /*
4798   get a tunable
4799  */
4800 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4801 {
4802         const char *name;
4803         uint32_t value;
4804         int ret;
4805
4806         if (argc < 1) {
4807                 usage();
4808         }
4809
4810         name = argv[0];
4811         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4812         if (ret != 0) {
4813                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4814                 return -1;
4815         }
4816
4817         printf("%-23s = %u\n", name, value);
4818         return 0;
4819 }
4820
4821 /*
4822   set a tunable
4823  */
4824 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4825 {
4826         const char *name;
4827         uint32_t value;
4828         int ret;
4829
4830         if (argc < 2) {
4831                 usage();
4832         }
4833
4834         name = argv[0];
4835         value = strtoul(argv[1], NULL, 0);
4836
4837         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4838         if (ret == -1) {
4839                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4840                 return -1;
4841         }
4842         return 0;
4843 }
4844
4845 /*
4846   list all tunables
4847  */
4848 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4849 {
4850         uint32_t count;
4851         const char **list;
4852         int ret, i;
4853
4854         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4855         if (ret == -1) {
4856                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4857                 return -1;
4858         }
4859
4860         for (i=0;i<count;i++) {
4861                 control_getvar(ctdb, 1, &list[i]);
4862         }
4863
4864         talloc_free(list);
4865         
4866         return 0;
4867 }
4868
4869 /*
4870   display debug level on a node
4871  */
4872 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4873 {
4874         int ret;
4875         int32_t level;
4876
4877         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4878         if (ret != 0) {
4879                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4880                 return ret;
4881         } else {
4882                 const char *desc = get_debug_by_level(level);
4883                 if (desc == NULL) {
4884                         /* This should never happen */
4885                         desc = "Unknown";
4886                 }
4887                 if (options.machinereadable){
4888                         printm(":Name:Level:\n");
4889                         printm(":%s:%d:\n", desc, level);
4890                 } else {
4891                         printf("Node %u is at debug level %s (%d)\n",
4892                                options.pnn, desc, level);
4893                 }
4894         }
4895         return 0;
4896 }
4897
4898 /*
4899   display reclock file of a node
4900  */
4901 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4902 {
4903         int ret;
4904         const char *reclock;
4905
4906         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4907         if (ret != 0) {
4908                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4909                 return ret;
4910         } else {
4911                 if (options.machinereadable){
4912                         if (reclock != NULL) {
4913                                 printm("%s", reclock);
4914                         }
4915                 } else {
4916                         if (reclock == NULL) {
4917                                 printf("No reclock file used.\n");
4918                         } else {
4919                                 printf("Reclock file:%s\n", reclock);
4920                         }
4921                 }
4922         }
4923         return 0;
4924 }
4925
4926 /*
4927   set the reclock file of a node
4928  */
4929 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4930 {
4931         int ret;
4932         const char *reclock;
4933
4934         if (argc == 0) {
4935                 reclock = NULL;
4936         } else if (argc == 1) {
4937                 reclock = argv[0];
4938         } else {
4939                 usage();
4940         }
4941
4942         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4943         if (ret != 0) {
4944                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4945                 return ret;
4946         }
4947         return 0;
4948 }
4949
4950 /*
4951   set the natgw state on/off
4952  */
4953 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4954 {
4955         int ret;
4956         uint32_t natgwstate;
4957
4958         if (argc == 0) {
4959                 usage();
4960         }
4961
4962         if (!strcmp(argv[0], "on")) {
4963                 natgwstate = 1;
4964         } else if (!strcmp(argv[0], "off")) {
4965                 natgwstate = 0;
4966         } else {
4967                 usage();
4968         }
4969
4970         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4971         if (ret != 0) {
4972                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4973                 return ret;
4974         }
4975
4976         return 0;
4977 }
4978
4979 /*
4980   set the lmaster role on/off
4981  */
4982 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4983 {
4984         int ret;
4985         uint32_t lmasterrole;
4986
4987         if (argc == 0) {
4988                 usage();
4989         }
4990
4991         if (!strcmp(argv[0], "on")) {
4992                 lmasterrole = 1;
4993         } else if (!strcmp(argv[0], "off")) {
4994                 lmasterrole = 0;
4995         } else {
4996                 usage();
4997         }
4998
4999         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
5000         if (ret != 0) {
5001                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
5002                 return ret;
5003         }
5004
5005         return 0;
5006 }
5007
5008 /*
5009   set the recmaster role on/off
5010  */
5011 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5012 {
5013         int ret;
5014         uint32_t recmasterrole;
5015
5016         if (argc == 0) {
5017                 usage();
5018         }
5019
5020         if (!strcmp(argv[0], "on")) {
5021                 recmasterrole = 1;
5022         } else if (!strcmp(argv[0], "off")) {
5023                 recmasterrole = 0;
5024         } else {
5025                 usage();
5026         }
5027
5028         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5029         if (ret != 0) {
5030                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5031                 return ret;
5032         }
5033
5034         return 0;
5035 }
5036
5037 /*
5038   set debug level on a node or all nodes
5039  */
5040 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5041 {
5042         int ret;
5043         int32_t level;
5044
5045         if (argc == 0) {
5046                 printf("You must specify the debug level. Valid levels are:\n");
5047                 print_debug_levels(stdout);
5048                 return 0;
5049         }
5050
5051         if (!parse_debug(argv[0], &level)) {
5052                 printf("Invalid debug level, must be one of\n");
5053                 print_debug_levels(stdout);
5054                 return -1;
5055         }
5056
5057         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5058         if (ret != 0) {
5059                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5060         }
5061         return 0;
5062 }
5063
5064
5065 /*
5066   thaw a node
5067  */
5068 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5069 {
5070         int ret;
5071         uint32_t priority;
5072         
5073         if (argc == 1) {
5074                 priority = strtol(argv[0], NULL, 0);
5075         } else {
5076                 priority = 0;
5077         }
5078         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5079
5080         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5081         if (ret != 0) {
5082                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5083         }               
5084         return 0;
5085 }
5086
5087
5088 /*
5089   attach to a database
5090  */
5091 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5092 {
5093         const char *db_name;
5094         struct ctdb_db_context *ctdb_db;
5095         bool persistent = false;
5096
5097         if (argc < 1) {
5098                 usage();
5099         }
5100         db_name = argv[0];
5101         if (argc > 2) {
5102                 usage();
5103         }
5104         if (argc == 2) {
5105                 if (strcmp(argv[1], "persistent") != 0) {
5106                         usage();
5107                 }
5108                 persistent = true;
5109         }
5110
5111         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5112         if (ctdb_db == NULL) {
5113                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5114                 return -1;
5115         }
5116
5117         return 0;
5118 }
5119
5120 /*
5121  * detach from a database
5122  */
5123 static int control_detach(struct ctdb_context *ctdb, int argc,
5124                           const char **argv)
5125 {
5126         uint32_t db_id;
5127         uint8_t flags;
5128         int ret, i, status = 0;
5129         struct ctdb_node_map *nodemap = NULL;
5130         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5131         uint32_t recmode;
5132
5133         if (argc < 1) {
5134                 usage();
5135         }
5136
5137         assert_single_node_only();
5138
5139         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
5140                                     &recmode);
5141         if (ret != 0) {
5142                 DEBUG(DEBUG_ERR, ("Database cannot be detached "
5143                                   "when recovery is active\n"));
5144                 talloc_free(tmp_ctx);
5145                 return -1;
5146         }
5147
5148         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5149                                    &nodemap);
5150         if (ret != 0) {
5151                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5152                                   options.pnn));
5153                 talloc_free(tmp_ctx);
5154                 return -1;
5155         }
5156
5157         for (i=0; i<nodemap->num; i++) {
5158                 uint32_t value;
5159
5160                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
5161                         continue;
5162                 }
5163
5164                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
5165                         continue;
5166                 }
5167
5168                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
5169                         DEBUG(DEBUG_ERR, ("Database cannot be detached on "
5170                                           "inactive (stopped or banned) node "
5171                                           "%u\n", nodemap->nodes[i].pnn));
5172                         talloc_free(tmp_ctx);
5173                         return -1;
5174                 }
5175
5176                 ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
5177                                             nodemap->nodes[i].pnn,
5178                                             "AllowClientDBAttach",
5179                                             &value);
5180                 if (ret != 0) {
5181                         DEBUG(DEBUG_ERR, ("Unable to get tunable "
5182                                           "AllowClientDBAttach from node %u\n",
5183                                            nodemap->nodes[i].pnn));
5184                         talloc_free(tmp_ctx);
5185                         return -1;
5186                 }
5187
5188                 if (value == 1) {
5189                         DEBUG(DEBUG_ERR, ("Database access is still active on "
5190                                           "node %u. Set AllowClientDBAttach=0 "
5191                                           "on all nodes.\n",
5192                                           nodemap->nodes[i].pnn));
5193                         talloc_free(tmp_ctx);
5194                         return -1;
5195                 }
5196         }
5197
5198         talloc_free(tmp_ctx);
5199
5200         for (i=0; i<argc; i++) {
5201                 if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
5202                         continue;
5203                 }
5204
5205                 if (flags & CTDB_DB_FLAGS_PERSISTENT) {
5206                         DEBUG(DEBUG_ERR, ("Persistent database '%s' "
5207                                           "cannot be detached\n", argv[i]));
5208                         status = -1;
5209                         continue;
5210                 }
5211
5212                 ret = ctdb_detach(ctdb, db_id);
5213                 if (ret != 0) {
5214                         DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
5215                                           argv[i]));
5216                         status = ret;
5217                 }
5218         }
5219
5220         return status;
5221 }
5222
5223 /*
5224   set db priority
5225  */
5226 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5227 {
5228         struct ctdb_db_priority db_prio;
5229         int ret;
5230
5231         if (argc < 2) {
5232                 usage();
5233         }
5234
5235         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5236         db_prio.priority = strtoul(argv[1], NULL, 0);
5237
5238         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5239         if (ret != 0) {
5240                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5241                 return -1;
5242         }
5243
5244         return 0;
5245 }
5246
5247 /*
5248   get db priority
5249  */
5250 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5251 {
5252         uint32_t db_id, priority;
5253         int ret;
5254
5255         if (argc < 1) {
5256                 usage();
5257         }
5258
5259         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5260                 return -1;
5261         }
5262
5263         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5264         if (ret != 0) {
5265                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5266                 return -1;
5267         }
5268
5269         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5270
5271         return 0;
5272 }
5273
5274 /*
5275   set the sticky records capability for a database
5276  */
5277 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5278 {
5279         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5280         uint32_t db_id;
5281         int ret;
5282
5283         if (argc < 1) {
5284                 usage();
5285         }
5286
5287         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5288                 return -1;
5289         }
5290
5291         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5292         if (ret != 0) {
5293                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5294                 talloc_free(tmp_ctx);
5295                 return -1;
5296         }
5297
5298         talloc_free(tmp_ctx);
5299         return 0;
5300 }
5301
5302 /*
5303   set the readonly capability for a database
5304  */
5305 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5306 {
5307         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5308         uint32_t db_id;
5309         int ret;
5310
5311         if (argc < 1) {
5312                 usage();
5313         }
5314
5315         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5316                 return -1;
5317         }
5318
5319         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5320         if (ret != 0) {
5321                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5322                 talloc_free(tmp_ctx);
5323                 return -1;
5324         }
5325
5326         talloc_free(tmp_ctx);
5327         return 0;
5328 }
5329
5330 /*
5331   get db seqnum
5332  */
5333 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5334 {
5335         uint32_t db_id;
5336         uint64_t seqnum;
5337         int ret;
5338
5339         if (argc < 1) {
5340                 usage();
5341         }
5342
5343         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5344                 return -1;
5345         }
5346
5347         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5348         if (ret != 0) {
5349                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5350                 return -1;
5351         }
5352
5353         printf("Sequence number:%lld\n", (long long)seqnum);
5354
5355         return 0;
5356 }
5357
5358 /*
5359   run an eventscript on a node
5360  */
5361 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5362 {
5363         TDB_DATA data;
5364         int ret;
5365         int32_t res;
5366         char *errmsg;
5367         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5368
5369         if (argc != 1) {
5370                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5371                 return -1;
5372         }
5373
5374         data.dptr = (unsigned char *)discard_const(argv[0]);
5375         data.dsize = strlen((char *)data.dptr) + 1;
5376
5377         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5378
5379         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5380                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5381         if (ret != 0 || res != 0) {
5382                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5383                 talloc_free(tmp_ctx);
5384                 return -1;
5385         }
5386         talloc_free(tmp_ctx);
5387         return 0;
5388 }
5389
5390 #define DB_VERSION 1
5391 #define MAX_DB_NAME 64
5392 struct db_file_header {
5393         unsigned long version;
5394         time_t timestamp;
5395         unsigned long persistent;
5396         unsigned long size;
5397         const char name[MAX_DB_NAME];
5398 };
5399
5400 struct backup_data {
5401         struct ctdb_marshall_buffer *records;
5402         uint32_t len;
5403         uint32_t total;
5404         bool traverse_error;
5405 };
5406
5407 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5408 {
5409         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5410         struct ctdb_rec_data *rec;
5411
5412         /* add the record */
5413         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5414         if (rec == NULL) {
5415                 bd->traverse_error = true;
5416                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5417                 return -1;
5418         }
5419         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5420         if (bd->records == NULL) {
5421                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5422                 bd->traverse_error = true;
5423                 return -1;
5424         }
5425         bd->records->count++;
5426         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5427         bd->len += rec->length;
5428         talloc_free(rec);
5429
5430         bd->total++;
5431         return 0;
5432 }
5433
5434 /*
5435  * backup a database to a file 
5436  */
5437 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5438 {
5439         const char *db_name;
5440         int ret;
5441         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5442         struct db_file_header dbhdr;
5443         struct ctdb_db_context *ctdb_db;
5444         struct backup_data *bd;
5445         int fh = -1;
5446         int status = -1;
5447         const char *reason = NULL;
5448         uint32_t db_id;
5449         uint8_t flags;
5450
5451         assert_single_node_only();
5452
5453         if (argc != 2) {
5454                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5455                 return -1;
5456         }
5457
5458         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5459                 return -1;
5460         }
5461
5462         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5463                                     db_id, tmp_ctx, &reason);
5464         if (ret != 0) {
5465                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5466                                  argv[0]));
5467                 talloc_free(tmp_ctx);
5468                 return -1;
5469         }
5470         if (reason) {
5471                 uint32_t allow_unhealthy = 0;
5472
5473                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5474                                       "AllowUnhealthyDBRead",
5475                                       &allow_unhealthy);
5476
5477                 if (allow_unhealthy != 1) {
5478                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5479                                          argv[0], reason));
5480
5481                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5482                                          allow_unhealthy));
5483                         talloc_free(tmp_ctx);
5484                         return -1;
5485                 }
5486
5487                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5488                                      argv[0], argv[0]));
5489                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5490                                      "tunnable AllowUnhealthyDBRead = %u\n",
5491                                      allow_unhealthy));
5492         }
5493
5494         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5495         if (ctdb_db == NULL) {
5496                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5497                 talloc_free(tmp_ctx);
5498                 return -1;
5499         }
5500
5501
5502         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5503         if (ret == -1) {
5504                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5505                 talloc_free(tmp_ctx);
5506                 return -1;
5507         }
5508
5509
5510         bd = talloc_zero(tmp_ctx, struct backup_data);
5511         if (bd == NULL) {
5512                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5513                 talloc_free(tmp_ctx);
5514                 return -1;
5515         }
5516
5517         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5518         if (bd->records == NULL) {
5519                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5520                 talloc_free(tmp_ctx);
5521                 return -1;
5522         }
5523
5524         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5525         bd->records->db_id = ctdb_db->db_id;
5526         /* traverse the database collecting all records */
5527         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5528             bd->traverse_error) {
5529                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5530                 talloc_free(tmp_ctx);
5531                 return -1;              
5532         }
5533
5534         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5535
5536
5537         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5538         if (fh == -1) {
5539                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5540                 talloc_free(tmp_ctx);
5541                 return -1;
5542         }
5543
5544         ZERO_STRUCT(dbhdr);
5545         dbhdr.version = DB_VERSION;
5546         dbhdr.timestamp = time(NULL);
5547         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5548         dbhdr.size = bd->len;
5549         if (strlen(argv[0]) >= MAX_DB_NAME) {
5550                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5551                 goto done;
5552         }
5553         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5554         ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
5555         if (ret == -1) {
5556                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5557                 goto done;
5558         }
5559         ret = sys_write(fh, bd->records, bd->len);
5560         if (ret == -1) {
5561                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5562                 goto done;
5563         }
5564
5565         status = 0;
5566 done:
5567         if (fh != -1) {
5568                 ret = close(fh);
5569                 if (ret == -1) {
5570                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5571                 }
5572         }
5573
5574         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5575
5576         talloc_free(tmp_ctx);
5577         return status;
5578 }
5579
5580 /*
5581  * restore a database from a file 
5582  */
5583 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5584 {
5585         int ret;
5586         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5587         TDB_DATA outdata;
5588         TDB_DATA data;
5589         struct db_file_header dbhdr;
5590         struct ctdb_db_context *ctdb_db;
5591         struct ctdb_node_map *nodemap=NULL;
5592         struct ctdb_vnn_map *vnnmap=NULL;
5593         int i, fh;
5594         struct ctdb_control_wipe_database w;
5595         uint32_t *nodes;
5596         uint32_t generation;
5597         struct tm *tm;
5598         char tbuf[100];
5599         char *dbname;
5600
5601         assert_single_node_only();
5602
5603         if (argc < 1 || argc > 2) {
5604                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5605                 return -1;
5606         }
5607
5608         fh = open(argv[0], O_RDONLY);
5609         if (fh == -1) {
5610                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5611                 talloc_free(tmp_ctx);
5612                 return -1;
5613         }
5614
5615         sys_read(fh, &dbhdr, sizeof(dbhdr));
5616         if (dbhdr.version != DB_VERSION) {
5617                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5618                 close(fh);
5619                 talloc_free(tmp_ctx);
5620                 return -1;
5621         }
5622
5623         dbname = discard_const(dbhdr.name);
5624         if (argc == 2) {
5625                 dbname = discard_const(argv[1]);
5626         }
5627
5628         outdata.dsize = dbhdr.size;
5629         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5630         if (outdata.dptr == NULL) {
5631                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5632                 close(fh);
5633                 talloc_free(tmp_ctx);
5634                 return -1;
5635         }               
5636         sys_read(fh, outdata.dptr, outdata.dsize);
5637         close(fh);
5638
5639         tm = localtime(&dbhdr.timestamp);
5640         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5641         printf("Restoring database '%s' from backup @ %s\n",
5642                 dbname, tbuf);
5643
5644
5645         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5646         if (ctdb_db == NULL) {
5647                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5648                 talloc_free(tmp_ctx);
5649                 return -1;
5650         }
5651
5652         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5653         if (ret != 0) {
5654                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5655                 talloc_free(tmp_ctx);
5656                 return ret;
5657         }
5658
5659
5660         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5661         if (ret != 0) {
5662                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5663                 talloc_free(tmp_ctx);
5664                 return ret;
5665         }
5666
5667         /* freeze all nodes */
5668         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5669         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5670                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5671                                         nodes, i,
5672                                         TIMELIMIT(),
5673                                         false, tdb_null,
5674                                         NULL, NULL,
5675                                         NULL) != 0) {
5676                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5677                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5678                         talloc_free(tmp_ctx);
5679                         return -1;
5680                 }
5681         }
5682
5683         generation = vnnmap->generation;
5684         data.dptr = (void *)&generation;
5685         data.dsize = sizeof(generation);
5686
5687         /* start a cluster wide transaction */
5688         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5689         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5690                                         nodes, 0,
5691                                         TIMELIMIT(), false, data,
5692                                         NULL, NULL,
5693                                         NULL) != 0) {
5694                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5695                 return -1;
5696         }
5697
5698
5699         w.db_id = ctdb_db->db_id;
5700         w.transaction_id = generation;
5701
5702         data.dptr = (void *)&w;
5703         data.dsize = sizeof(w);
5704
5705         /* wipe all the remote databases. */
5706         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5707         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5708                                         nodes, 0,
5709                                         TIMELIMIT(), false, data,
5710                                         NULL, NULL,
5711                                         NULL) != 0) {
5712                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5713                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5714                 talloc_free(tmp_ctx);
5715                 return -1;
5716         }
5717         
5718         /* push the database */
5719         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5720         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5721                                         nodes, 0,
5722                                         TIMELIMIT(), false, outdata,
5723                                         NULL, NULL,
5724                                         NULL) != 0) {
5725                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5726                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5727                 talloc_free(tmp_ctx);
5728                 return -1;
5729         }
5730
5731         data.dptr = (void *)&ctdb_db->db_id;
5732         data.dsize = sizeof(ctdb_db->db_id);
5733
5734         /* mark the database as healthy */
5735         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5736         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5737                                         nodes, 0,
5738                                         TIMELIMIT(), false, data,
5739                                         NULL, NULL,
5740                                         NULL) != 0) {
5741                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5742                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5743                 talloc_free(tmp_ctx);
5744                 return -1;
5745         }
5746
5747         data.dptr = (void *)&generation;
5748         data.dsize = sizeof(generation);
5749
5750         /* commit all the changes */
5751         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5752                                         nodes, 0,
5753                                         TIMELIMIT(), false, data,
5754                                         NULL, NULL,
5755                                         NULL) != 0) {
5756                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5757                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5758                 talloc_free(tmp_ctx);
5759                 return -1;
5760         }
5761
5762
5763         /* thaw all nodes */
5764         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5765         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5766                                         nodes, 0,
5767                                         TIMELIMIT(),
5768                                         false, tdb_null,
5769                                         NULL, NULL,
5770                                         NULL) != 0) {
5771                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5772                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5773                 talloc_free(tmp_ctx);
5774                 return -1;
5775         }
5776
5777
5778         talloc_free(tmp_ctx);
5779         return 0;
5780 }
5781
5782 /*
5783  * dump a database backup from a file
5784  */
5785 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5786 {
5787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5788         TDB_DATA outdata;
5789         struct db_file_header dbhdr;
5790         int i, fh;
5791         struct tm *tm;
5792         char tbuf[100];
5793         struct ctdb_rec_data *rec = NULL;
5794         struct ctdb_marshall_buffer *m;
5795         struct ctdb_dump_db_context c;
5796
5797         assert_single_node_only();
5798
5799         if (argc != 1) {
5800                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5801                 return -1;
5802         }
5803
5804         fh = open(argv[0], O_RDONLY);
5805         if (fh == -1) {
5806                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5807                 talloc_free(tmp_ctx);
5808                 return -1;
5809         }
5810
5811         sys_read(fh, &dbhdr, sizeof(dbhdr));
5812         if (dbhdr.version != DB_VERSION) {
5813                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5814                 close(fh);
5815                 talloc_free(tmp_ctx);
5816                 return -1;
5817         }
5818
5819         outdata.dsize = dbhdr.size;
5820         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5821         if (outdata.dptr == NULL) {
5822                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5823                 close(fh);
5824                 talloc_free(tmp_ctx);
5825                 return -1;
5826         }
5827         sys_read(fh, outdata.dptr, outdata.dsize);
5828         close(fh);
5829         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5830
5831         tm = localtime(&dbhdr.timestamp);
5832         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5833         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5834                 dbhdr.name, m->db_id, tbuf);
5835
5836         ZERO_STRUCT(c);
5837         c.f = stdout;
5838         c.printemptyrecords = (bool)options.printemptyrecords;
5839         c.printdatasize = (bool)options.printdatasize;
5840         c.printlmaster = false;
5841         c.printhash = (bool)options.printhash;
5842         c.printrecordflags = (bool)options.printrecordflags;
5843
5844         for (i=0; i < m->count; i++) {
5845                 uint32_t reqid = 0;
5846                 TDB_DATA key, data;
5847
5848                 /* we do not want the header splitted, so we pass NULL*/
5849                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5850                                               NULL, &key, &data);
5851
5852                 ctdb_dumpdb_record(ctdb, key, data, &c);
5853         }
5854
5855         printf("Dumped %d records\n", i);
5856         talloc_free(tmp_ctx);
5857         return 0;
5858 }
5859
5860 /*
5861  * wipe a database from a file
5862  */
5863 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5864                           const char **argv)
5865 {
5866         const char *db_name;
5867         int ret;
5868         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5869         TDB_DATA data;
5870         struct ctdb_db_context *ctdb_db;
5871         struct ctdb_node_map *nodemap = NULL;
5872         struct ctdb_vnn_map *vnnmap = NULL;
5873         int i;
5874         struct ctdb_control_wipe_database w;
5875         uint32_t *nodes;
5876         uint32_t generation;
5877         uint8_t flags;
5878
5879         assert_single_node_only();
5880
5881         if (argc != 1) {
5882                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5883                 return -1;
5884         }
5885
5886         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5887                 return -1;
5888         }
5889
5890         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5891         if (ctdb_db == NULL) {
5892                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5893                                   argv[0]));
5894                 talloc_free(tmp_ctx);
5895                 return -1;
5896         }
5897
5898         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5899                                    &nodemap);
5900         if (ret != 0) {
5901                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5902                                   options.pnn));
5903                 talloc_free(tmp_ctx);
5904                 return ret;
5905         }
5906
5907         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5908                                   &vnnmap);
5909         if (ret != 0) {
5910                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5911                                   options.pnn));
5912                 talloc_free(tmp_ctx);
5913                 return ret;
5914         }
5915
5916         /* freeze all nodes */
5917         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5918         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5919                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5920                                                 nodes, i,
5921                                                 TIMELIMIT(),
5922                                                 false, tdb_null,
5923                                                 NULL, NULL,
5924                                                 NULL);
5925                 if (ret != 0) {
5926                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5927                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5928                                              CTDB_RECOVERY_ACTIVE);
5929                         talloc_free(tmp_ctx);
5930                         return -1;
5931                 }
5932         }
5933
5934         generation = vnnmap->generation;
5935         data.dptr = (void *)&generation;
5936         data.dsize = sizeof(generation);
5937
5938         /* start a cluster wide transaction */
5939         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5940         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5941                                         nodes, 0,
5942                                         TIMELIMIT(), false, data,
5943                                         NULL, NULL,
5944                                         NULL);
5945         if (ret!= 0) {
5946                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5947                                   "transactions.\n"));
5948                 return -1;
5949         }
5950
5951         w.db_id = ctdb_db->db_id;
5952         w.transaction_id = generation;
5953
5954         data.dptr = (void *)&w;
5955         data.dsize = sizeof(w);
5956
5957         /* wipe all the remote databases. */
5958         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5959         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5960                                         nodes, 0,
5961                                         TIMELIMIT(), false, data,
5962                                         NULL, NULL,
5963                                         NULL) != 0) {
5964                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5965                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5966                 talloc_free(tmp_ctx);
5967                 return -1;
5968         }
5969
5970         data.dptr = (void *)&ctdb_db->db_id;
5971         data.dsize = sizeof(ctdb_db->db_id);
5972
5973         /* mark the database as healthy */
5974         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5975         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5976                                         nodes, 0,
5977                                         TIMELIMIT(), false, data,
5978                                         NULL, NULL,
5979                                         NULL) != 0) {
5980                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5981                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5982                 talloc_free(tmp_ctx);
5983                 return -1;
5984         }
5985
5986         data.dptr = (void *)&generation;
5987         data.dsize = sizeof(generation);
5988
5989         /* commit all the changes */
5990         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5991                                         nodes, 0,
5992                                         TIMELIMIT(), false, data,
5993                                         NULL, NULL,
5994                                         NULL) != 0) {
5995                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5996                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5997                 talloc_free(tmp_ctx);
5998                 return -1;
5999         }
6000
6001         /* thaw all nodes */
6002         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
6003         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
6004                                         nodes, 0,
6005                                         TIMELIMIT(),
6006                                         false, tdb_null,
6007                                         NULL, NULL,
6008                                         NULL) != 0) {
6009                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
6010                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6011                 talloc_free(tmp_ctx);
6012                 return -1;
6013         }
6014
6015         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
6016
6017         talloc_free(tmp_ctx);
6018         return 0;
6019 }
6020
6021 /*
6022   dump memory usage
6023  */
6024 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6025 {
6026         TDB_DATA data;
6027         int ret;
6028         int32_t res;
6029         char *errmsg;
6030         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
6031         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
6032                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
6033         if (ret != 0 || res != 0) {
6034                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
6035                 talloc_free(tmp_ctx);
6036                 return -1;
6037         }
6038         sys_write(1, data.dptr, data.dsize);
6039         talloc_free(tmp_ctx);
6040         return 0;
6041 }
6042
6043 /*
6044   handler for memory dumps
6045 */
6046 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6047                              TDB_DATA data, void *private_data)
6048 {
6049         sys_write(1, data.dptr, data.dsize);
6050         exit(0);
6051 }
6052
6053 /*
6054   dump memory usage on the recovery daemon
6055  */
6056 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6057 {
6058         int ret;
6059         TDB_DATA data;
6060         struct srvid_request rd;
6061
6062         rd.pnn = ctdb_get_pnn(ctdb);
6063         rd.srvid = getpid();
6064
6065         /* register a message port for receiveing the reply so that we
6066            can receive the reply
6067         */
6068         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6069
6070
6071         data.dptr = (uint8_t *)&rd;
6072         data.dsize = sizeof(rd);
6073
6074         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6075         if (ret != 0) {
6076                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6077                 return -1;
6078         }
6079
6080         /* this loop will terminate when we have received the reply */
6081         while (1) {     
6082                 event_loop_once(ctdb->ev);
6083         }
6084
6085         return 0;
6086 }
6087
6088 /*
6089   send a message to a srvid
6090  */
6091 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6092 {
6093         unsigned long srvid;
6094         int ret;
6095         TDB_DATA data;
6096
6097         if (argc < 2) {
6098                 usage();
6099         }
6100
6101         srvid      = strtoul(argv[0], NULL, 0);
6102
6103         data.dptr = (uint8_t *)discard_const(argv[1]);
6104         data.dsize= strlen(argv[1]);
6105
6106         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6107         if (ret != 0) {
6108                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6109                 return -1;
6110         }
6111
6112         return 0;
6113 }
6114
6115 /*
6116   handler for msglisten
6117 */
6118 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6119                              TDB_DATA data, void *private_data)
6120 {
6121         int i;
6122
6123         printf("Message received: ");
6124         for (i=0;i<data.dsize;i++) {
6125                 printf("%c", data.dptr[i]);
6126         }
6127         printf("\n");
6128 }
6129
6130 /*
6131   listen for messages on a messageport
6132  */
6133 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6134 {
6135         uint64_t srvid;
6136
6137         srvid = getpid();
6138
6139         /* register a message port and listen for messages
6140         */
6141         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6142         printf("Listening for messages on srvid:%d\n", (int)srvid);
6143
6144         while (1) {     
6145                 event_loop_once(ctdb->ev);
6146         }
6147
6148         return 0;
6149 }
6150
6151 /*
6152   list all nodes in the cluster
6153   we parse the nodes file directly
6154  */
6155 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6156 {
6157         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6158         struct pnn_node *pnn_nodes;
6159         struct pnn_node *pnn_node;
6160
6161         assert_single_node_only();
6162
6163         pnn_nodes = read_nodes_file(mem_ctx);
6164         if (pnn_nodes == NULL) {
6165                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
6166                 talloc_free(mem_ctx);
6167                 return -1;
6168         }
6169
6170         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
6171                 const char *addr = ctdb_addr_to_str(&pnn_node->addr);
6172                 if (options.machinereadable){
6173                         printm(":%d:%s:\n", pnn_node->pnn, addr);
6174                 } else {
6175                         printf("%s\n", addr);
6176                 }
6177         }
6178         talloc_free(mem_ctx);
6179
6180         return 0;
6181 }
6182
6183 /*
6184   reload the nodes file on the local node
6185  */
6186 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6187 {
6188         int i, ret;
6189         int mypnn;
6190         struct ctdb_node_map *nodemap=NULL;
6191
6192         assert_single_node_only();
6193
6194         mypnn = ctdb_get_pnn(ctdb);
6195
6196         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6197         if (ret != 0) {
6198                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6199                 return ret;
6200         }
6201
6202         /* reload the nodes file on all remote nodes */
6203         for (i=0;i<nodemap->num;i++) {
6204                 if (nodemap->nodes[i].pnn == mypnn) {
6205                         continue;
6206                 }
6207                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
6208                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
6209                         nodemap->nodes[i].pnn);
6210                 if (ret != 0) {
6211                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
6212                 }
6213         }
6214
6215         /* reload the nodes file on the local node */
6216         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
6217         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
6218         if (ret != 0) {
6219                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
6220         }
6221
6222         /* initiate a recovery */
6223         control_recover(ctdb, argc, argv);
6224
6225         return 0;
6226 }
6227
6228
6229 static const struct {
6230         const char *name;
6231         int (*fn)(struct ctdb_context *, int, const char **);
6232         bool auto_all;
6233         bool without_daemon; /* can be run without daemon running ? */
6234         const char *msg;
6235         const char *args;
6236 } ctdb_commands[] = {
6237         { "version",         control_version,           true,   true,   "show version of ctdb" },
6238         { "status",          control_status,            true,   false,  "show node status" },
6239         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6240         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6241         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6242         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6243         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6244         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6245         { "statistics",      control_statistics,        false,  false, "show statistics" },
6246         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6247         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6248         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6249         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6250         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6251         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6252         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6253         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6254         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6255         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6256         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6257         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6258         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6259         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6260         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6261         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6262         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6263         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6264         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6265         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6266         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6267         { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
6268         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6269         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6270         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6271         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6272         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6273         { "stop",            control_stop,              true,   false,  "stop a node" },
6274         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6275         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6276         { "unban",           control_unban,             true,   false,  "unban a node" },
6277         { "showban",         control_showban,           true,   false,  "show ban information"},
6278         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6279         { "recover",         control_recover,           true,   false,  "force recovery" },
6280         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6281         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6282         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6283         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6284         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6285         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6286         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6287         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6288         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6289
6290         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6291
6292         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6293         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6294         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6295         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6296         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6297         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6298         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6299         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6300         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6301         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6302         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6303         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6304         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6305         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6306         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6307         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6308         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6309         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6310         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6311         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6312         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6313         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6314         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6315         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6316         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6317         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6318         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6319         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6320         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6321         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6322         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6323         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6324         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6325         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6326         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6327         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6328         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6329         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6330         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6331         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6332         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
6333         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
6334         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6335         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6336         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6337         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6338         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6339         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6340         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6341 };
6342
6343 /*
6344   show usage message
6345  */
6346 static void usage(void)
6347 {
6348         int i;
6349         printf(
6350 "Usage: ctdb [options] <control>\n" \
6351 "Options:\n" \
6352 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6353 "   -Y                 generate machine readable output\n"
6354 "   -x <char>          specify delimiter for machine readable output\n"
6355 "   -v                 generate verbose output\n"
6356 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6357         printf("Controls:\n");
6358         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6359                 printf("  %-15s %-27s  %s\n", 
6360                        ctdb_commands[i].name, 
6361                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6362                        ctdb_commands[i].msg);
6363         }
6364         exit(1);
6365 }
6366
6367
6368 static void ctdb_alarm(int sig)
6369 {
6370         printf("Maximum runtime exceeded - exiting\n");
6371         _exit(ERR_TIMEOUT);
6372 }
6373
6374 /*
6375   main program
6376 */
6377 int main(int argc, const char *argv[])
6378 {
6379         struct ctdb_context *ctdb;
6380         char *nodestring = NULL;
6381         struct poptOption popt_options[] = {
6382                 POPT_AUTOHELP
6383                 POPT_CTDB_CMDLINE
6384                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6385                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6386                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
6387                 { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
6388                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6389                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6390                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6391                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6392                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6393                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6394                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6395                 POPT_TABLEEND
6396         };
6397         int opt;
6398         const char **extra_argv;
6399         int extra_argc = 0;
6400         int ret=-1, i;
6401         poptContext pc;
6402         struct event_context *ev;
6403         const char *control;
6404
6405         setlinebuf(stdout);
6406         
6407         /* set some defaults */
6408         options.maxruntime = 0;
6409         options.timelimit = 10;
6410         options.pnn = CTDB_CURRENT_NODE;
6411
6412         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6413
6414         while ((opt = poptGetNextOpt(pc)) != -1) {
6415                 switch (opt) {
6416                 default:
6417                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6418                                 poptBadOption(pc, 0), poptStrerror(opt)));
6419                         exit(1);
6420                 }
6421         }
6422
6423         /* setup the remaining options for the main program to use */
6424         extra_argv = poptGetArgs(pc);
6425         if (extra_argv) {
6426                 extra_argv++;
6427                 while (extra_argv[extra_argc]) extra_argc++;
6428         }
6429
6430         if (extra_argc < 1) {
6431                 usage();
6432         }
6433
6434         if (options.maxruntime == 0) {
6435                 const char *ctdb_timeout;
6436                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6437                 if (ctdb_timeout != NULL) {
6438                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6439                 } else {
6440                         /* default timeout is 120 seconds */
6441                         options.maxruntime = 120;
6442                 }
6443         }
6444
6445         if (options.machineseparator != NULL) {
6446                 if (strlen(options.machineseparator) != 1) {
6447                         printf("Invalid separator \"%s\" - "
6448                                "must be single character\n",
6449                                options.machineseparator);
6450                         exit(1);
6451                 }
6452
6453                 /* -x implies -Y */
6454                 options.machinereadable = true;
6455         } else if (options.machinereadable) {
6456                 options.machineseparator = ":";
6457         }
6458
6459         signal(SIGALRM, ctdb_alarm);
6460         alarm(options.maxruntime);
6461
6462         control = extra_argv[0];
6463
6464         /* Default value for CTDB_BASE - don't override */
6465         setenv("CTDB_BASE", CTDB_ETCDIR, 0);
6466
6467         ev = event_context_init(NULL);
6468         if (!ev) {
6469                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6470                 exit(1);
6471         }
6472
6473         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6474                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6475                         break;
6476                 }
6477         }
6478
6479         if (i == ARRAY_SIZE(ctdb_commands)) {
6480                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6481                 exit(1);
6482         }
6483
6484         if (ctdb_commands[i].without_daemon == true) {
6485                 if (nodestring != NULL) {
6486                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6487                         exit(1);
6488                 }
6489                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6490         }
6491
6492         /* initialise ctdb */
6493         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6494
6495         if (ctdb == NULL) {
6496                 uint32_t pnn;
6497                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6498
6499                 pnn = find_node_xpnn();
6500                 if (pnn == -1) {
6501                         DEBUG(DEBUG_ERR,
6502                               ("Is this node part of a CTDB cluster?\n"));
6503                 }
6504                 exit(1);
6505         }
6506
6507         /* setup the node number(s) to contact */
6508         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6509                               &options.nodes, &options.pnn)) {
6510                 usage();
6511         }
6512
6513         if (options.pnn == CTDB_CURRENT_NODE) {
6514                 options.pnn = options.nodes[0];
6515         }
6516
6517         if (ctdb_commands[i].auto_all && 
6518             ((options.pnn == CTDB_BROADCAST_ALL) ||
6519              (options.pnn == CTDB_MULTICAST))) {
6520                 int j;
6521
6522                 ret = 0;
6523                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6524                         options.pnn = options.nodes[j];
6525                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6526                 }
6527         } else {
6528                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6529         }
6530
6531         talloc_free(ctdb);
6532         talloc_free(ev);
6533         (void)poptFreeContext(pc);
6534
6535         return ret;
6536
6537 }