ctdb-tools: Reimplement read_nodes_file() using ctdb_get_nodes_file()
[amitay/samba.git] / ctdb / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 static void usage(void);
40
41 static struct {
42         int timelimit;
43         uint32_t pnn;
44         uint32_t *nodes;
45         int machinereadable;
46         const char *machineseparator;
47         int verbose;
48         int maxruntime;
49         int printemptyrecords;
50         int printdatasize;
51         int printlmaster;
52         int printhash;
53         int printrecordflags;
54 } options;
55
56 #define LONGTIMEOUT options.timelimit*10
57
58 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
59 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
60
61 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
62 {
63         return (tv2->tv_sec - tv->tv_sec) +
64                 (tv2->tv_usec - tv->tv_usec)*1.0e-6;
65 }
66
67 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
68 {
69         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
70         return 0;
71 }
72
73 /* Like printf(3) but substitute for separator in format */
74 static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
75 static int printm(const char *format, ...)
76 {
77         va_list ap;
78         int ret;
79         size_t len = strlen(format);
80         char new_format[len+1];
81
82         strcpy(new_format, format);
83
84         if (options.machineseparator[0] != ':') {
85                 all_string_sub(new_format,
86                                ":", options.machineseparator, len + 1);
87         }
88
89         va_start(ap, format);
90         ret = vprintf(new_format, ap);
91         va_end(ap);
92
93         return ret;
94 }
95
96 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
97                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
98                                    "Out of memory in " __location__ )); \
99                 abort();                                                \
100         }} while (0)
101
102 static uint32_t getpnn(struct ctdb_context *ctdb)
103 {
104         if ((options.pnn == CTDB_BROADCAST_ALL) ||
105             (options.pnn == CTDB_MULTICAST)) {
106                 DEBUG(DEBUG_ERR,
107                       ("Cannot get PNN for node %u\n", options.pnn));
108                 exit(1);
109         }
110
111         if (options.pnn == CTDB_CURRENT_NODE) {
112                 return ctdb_get_pnn(ctdb);
113         } else {
114                 return options.pnn;
115         }
116 }
117
118 static void assert_single_node_only(void)
119 {
120         if ((options.pnn == CTDB_BROADCAST_ALL) ||
121             (options.pnn == CTDB_MULTICAST)) {
122                 DEBUG(DEBUG_ERR,
123                       ("This control can not be applied to multiple PNNs\n"));
124                 exit(1);
125         }
126 }
127
128 /* Pretty print the flags to a static buffer in human-readable format.
129  * This never returns NULL!
130  */
131 static const char *pretty_print_flags(uint32_t flags)
132 {
133         int j;
134         static const struct {
135                 uint32_t flag;
136                 const char *name;
137         } flag_names[] = {
138                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
139                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
140                 { NODE_FLAGS_BANNED,                "BANNED" },
141                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
142                 { NODE_FLAGS_DELETED,               "DELETED" },
143                 { NODE_FLAGS_STOPPED,               "STOPPED" },
144                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
145         };
146         static char flags_str[512]; /* Big enough to contain all flag names */
147
148         flags_str[0] = '\0';
149         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
150                 if (flags & flag_names[j].flag) {
151                         if (flags_str[0] == '\0') {
152                                 (void) strcpy(flags_str, flag_names[j].name);
153                         } else {
154                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
155                                 (void) strncat(flags_str, flag_names[j].name,
156                                                sizeof(flags_str)-1);
157                         }
158                 }
159         }
160         if (flags_str[0] == '\0') {
161                 (void) strcpy(flags_str, "OK");
162         }
163
164         return flags_str;
165 }
166
167 static int h2i(char h)
168 {
169         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
170         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
171         return h - '0';
172 }
173
174 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
175 {
176         int i, len;
177         TDB_DATA key = {NULL, 0};
178
179         len = strlen(str);
180         if (len & 0x01) {
181                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
182                 return key;
183         }
184
185         key.dsize = len>>1;
186         key.dptr  = talloc_size(mem_ctx, key.dsize);
187
188         for (i=0; i < len/2; i++) {
189                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
190         }
191         return key;
192 }
193
194 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
195  * that are disconnected or deleted.  If dd_ok is true those nodes are
196  * included in the output list of nodes.  If dd_ok is false, those
197  * nodes are filtered from the "all" case and cause an error if
198  * explicitly specified.
199  */
200 static bool parse_nodestring(struct ctdb_context *ctdb,
201                              TALLOC_CTX *mem_ctx,
202                              const char * nodestring,
203                              uint32_t current_pnn,
204                              bool dd_ok,
205                              uint32_t **nodes,
206                              uint32_t *pnn_mode)
207 {
208         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
209         int n;
210         uint32_t i;
211         struct ctdb_node_map *nodemap;
212         int ret;
213
214         *nodes = NULL;
215
216         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
217         if (ret != 0) {
218                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
219                 talloc_free(tmp_ctx);
220                 exit(10);
221         }
222
223         if (nodestring != NULL) {
224                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
225                 if (*nodes == NULL) {
226                         goto failed;
227                 }
228
229                 n = 0;
230
231                 if (strcmp(nodestring, "all") == 0) {
232                         *pnn_mode = CTDB_BROADCAST_ALL;
233
234                         /* all */
235                         for (i = 0; i < nodemap->num; i++) {
236                                 if ((nodemap->nodes[i].flags &
237                                      (NODE_FLAGS_DISCONNECTED |
238                                       NODE_FLAGS_DELETED)) && !dd_ok) {
239                                         continue;
240                                 }
241                                 *nodes = talloc_realloc(mem_ctx, *nodes,
242                                                         uint32_t, n+1);
243                                 if (*nodes == NULL) {
244                                         goto failed;
245                                 }
246                                 (*nodes)[n] = i;
247                                 n++;
248                         }
249                 } else {
250                         /* x{,y...} */
251                         char *ns, *tok;
252
253                         ns = talloc_strdup(tmp_ctx, nodestring);
254                         tok = strtok(ns, ",");
255                         while (tok != NULL) {
256                                 uint32_t pnn;
257                                 char *endptr;
258                                 i = (uint32_t)strtoul(tok, &endptr, 0);
259                                 if (i == 0 && tok == endptr) {
260                                         DEBUG(DEBUG_ERR,
261                                               ("Invalid node %s\n", tok));
262                                         talloc_free(tmp_ctx);
263                                         exit(ERR_NONODE);
264                                 }
265                                 if (i >= nodemap->num) {
266                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
267                                         talloc_free(tmp_ctx);
268                                         exit(ERR_NONODE);
269                                 }
270                                 if ((nodemap->nodes[i].flags & 
271                                      (NODE_FLAGS_DISCONNECTED |
272                                       NODE_FLAGS_DELETED)) && !dd_ok) {
273                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
274                                         talloc_free(tmp_ctx);
275                                         exit(ERR_DISNODE);
276                                 }
277                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
278                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
279                                         talloc_free(tmp_ctx);
280                                         exit(10);
281                                 }
282
283                                 *nodes = talloc_realloc(mem_ctx, *nodes,
284                                                         uint32_t, n+1);
285                                 if (*nodes == NULL) {
286                                         goto failed;
287                                 }
288
289                                 (*nodes)[n] = i;
290                                 n++;
291
292                                 tok = strtok(NULL, ",");
293                         }
294                         talloc_free(ns);
295
296                         if (n == 1) {
297                                 *pnn_mode = (*nodes)[0];
298                         } else {
299                                 *pnn_mode = CTDB_MULTICAST;
300                         }
301                 }
302         } else {
303                 /* default - no nodes specified */
304                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
305                 if (*nodes == NULL) {
306                         goto failed;
307                 }
308                 *pnn_mode = CTDB_CURRENT_NODE;
309
310                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
311                         goto failed;
312                 }
313         }
314
315         talloc_free(tmp_ctx);
316         return true;
317
318 failed:
319         talloc_free(tmp_ctx);
320         return false;
321 }
322
323 /*
324  check if a database exists
325 */
326 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
327                       uint32_t *dbid, const char **dbname, uint8_t *flags)
328 {
329         int i, ret;
330         struct ctdb_dbid_map *dbmap=NULL;
331         bool dbid_given = false, found = false;
332         uint32_t id;
333         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
334         const char *name;
335
336         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
337         if (ret != 0) {
338                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
339                 goto fail;
340         }
341
342         if (strncmp(dbarg, "0x", 2) == 0) {
343                 id = strtoul(dbarg, NULL, 0);
344                 dbid_given = true;
345         }
346
347         for(i=0; i<dbmap->num; i++) {
348                 if (dbid_given) {
349                         if (id == dbmap->dbs[i].dbid) {
350                                 found = true;
351                                 break;
352                         }
353                 } else {
354                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
355                         if (ret != 0) {
356                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
357                                 goto fail;
358                         }
359
360                         if (strcmp(name, dbarg) == 0) {
361                                 id = dbmap->dbs[i].dbid;
362                                 found = true;
363                                 break;
364                         }
365                 }
366         }
367
368         if (found && dbid_given && dbname != NULL) {
369                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
372                         found = false;
373                         goto fail;
374                 }
375         }
376
377         if (found) {
378                 if (dbid) *dbid = id;
379                 if (dbname) *dbname = talloc_strdup(ctdb, name);
380                 if (flags) *flags = dbmap->dbs[i].flags;
381         } else {
382                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
383         }
384
385 fail:
386         talloc_free(tmp_ctx);
387         return found;
388 }
389
390 /*
391   see if a process exists
392  */
393 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
394 {
395         uint32_t pnn, pid;
396         int ret;
397         if (argc < 1) {
398                 usage();
399         }
400
401         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
402                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
403                 return -1;
404         }
405
406         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
407         if (ret == 0) {
408                 printf("%u:%u exists\n", pnn, pid);
409         } else {
410                 printf("%u:%u does not exist\n", pnn, pid);
411         }
412         return ret;
413 }
414
415 /*
416   display statistics structure
417  */
418 static void show_statistics(struct ctdb_statistics *s, int show_header)
419 {
420         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
421         int i;
422         const char *prefix=NULL;
423         int preflen=0;
424         int tmp, days, hours, minutes, seconds;
425         const struct {
426                 const char *name;
427                 uint32_t offset;
428         } fields[] = {
429 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
430                 STATISTICS_FIELD(num_clients),
431                 STATISTICS_FIELD(frozen),
432                 STATISTICS_FIELD(recovering),
433                 STATISTICS_FIELD(num_recoveries),
434                 STATISTICS_FIELD(client_packets_sent),
435                 STATISTICS_FIELD(client_packets_recv),
436                 STATISTICS_FIELD(node_packets_sent),
437                 STATISTICS_FIELD(node_packets_recv),
438                 STATISTICS_FIELD(keepalive_packets_sent),
439                 STATISTICS_FIELD(keepalive_packets_recv),
440                 STATISTICS_FIELD(node.req_call),
441                 STATISTICS_FIELD(node.reply_call),
442                 STATISTICS_FIELD(node.req_dmaster),
443                 STATISTICS_FIELD(node.reply_dmaster),
444                 STATISTICS_FIELD(node.reply_error),
445                 STATISTICS_FIELD(node.req_message),
446                 STATISTICS_FIELD(node.req_control),
447                 STATISTICS_FIELD(node.reply_control),
448                 STATISTICS_FIELD(client.req_call),
449                 STATISTICS_FIELD(client.req_message),
450                 STATISTICS_FIELD(client.req_control),
451                 STATISTICS_FIELD(timeouts.call),
452                 STATISTICS_FIELD(timeouts.control),
453                 STATISTICS_FIELD(timeouts.traverse),
454                 STATISTICS_FIELD(locks.num_calls),
455                 STATISTICS_FIELD(locks.num_current),
456                 STATISTICS_FIELD(locks.num_pending),
457                 STATISTICS_FIELD(locks.num_failed),
458                 STATISTICS_FIELD(total_calls),
459                 STATISTICS_FIELD(pending_calls),
460                 STATISTICS_FIELD(childwrite_calls),
461                 STATISTICS_FIELD(pending_childwrite_calls),
462                 STATISTICS_FIELD(memory_used),
463                 STATISTICS_FIELD(max_hop_count),
464                 STATISTICS_FIELD(total_ro_delegations),
465                 STATISTICS_FIELD(total_ro_revokes),
466         };
467         
468         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
469         seconds = tmp%60;
470         tmp    /= 60;
471         minutes = tmp%60;
472         tmp    /= 60;
473         hours   = tmp%24;
474         tmp    /= 24;
475         days    = tmp;
476
477         if (options.machinereadable){
478                 if (show_header) {
479                         printm("CTDB version:");
480                         printm("Current time of statistics:");
481                         printm("Statistics collected since:");
482                         for (i=0;i<ARRAY_SIZE(fields);i++) {
483                                 printm("%s:", fields[i].name);
484                         }
485                         printm("num_reclock_ctdbd_latency:");
486                         printm("min_reclock_ctdbd_latency:");
487                         printm("avg_reclock_ctdbd_latency:");
488                         printm("max_reclock_ctdbd_latency:");
489
490                         printm("num_reclock_recd_latency:");
491                         printm("min_reclock_recd_latency:");
492                         printm("avg_reclock_recd_latency:");
493                         printm("max_reclock_recd_latency:");
494
495                         printm("num_call_latency:");
496                         printm("min_call_latency:");
497                         printm("avg_call_latency:");
498                         printm("max_call_latency:");
499
500                         printm("num_lockwait_latency:");
501                         printm("min_lockwait_latency:");
502                         printm("avg_lockwait_latency:");
503                         printm("max_lockwait_latency:");
504
505                         printm("num_childwrite_latency:");
506                         printm("min_childwrite_latency:");
507                         printm("avg_childwrite_latency:");
508                         printm("max_childwrite_latency:");
509                         printm("\n");
510                 }
511                 printm("%d:", CTDB_PROTOCOL);
512                 printm("%d:", (int)s->statistics_current_time.tv_sec);
513                 printm("%d:", (int)s->statistics_start_time.tv_sec);
514                 for (i=0;i<ARRAY_SIZE(fields);i++) {
515                         printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
516                 }
517                 printm("%d:", s->reclock.ctdbd.num);
518                 printm("%.6f:", s->reclock.ctdbd.min);
519                 printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
520                 printm("%.6f:", s->reclock.ctdbd.max);
521
522                 printm("%d:", s->reclock.recd.num);
523                 printm("%.6f:", s->reclock.recd.min);
524                 printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
525                 printm("%.6f:", s->reclock.recd.max);
526
527                 printm("%d:", s->call_latency.num);
528                 printm("%.6f:", s->call_latency.min);
529                 printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
530                 printm("%.6f:", s->call_latency.max);
531
532                 printm("%d:", s->childwrite_latency.num);
533                 printm("%.6f:", s->childwrite_latency.min);
534                 printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
535                 printm("%.6f:", s->childwrite_latency.max);
536                 printm("\n");
537         } else {
538                 printf("CTDB version %u\n", CTDB_PROTOCOL);
539                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
540                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
541
542                 for (i=0;i<ARRAY_SIZE(fields);i++) {
543                         if (strchr(fields[i].name, '.')) {
544                                 preflen = strcspn(fields[i].name, ".")+1;
545                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
546                                         prefix = fields[i].name;
547                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
548                                 }
549                         } else {
550                                 preflen = 0;
551                         }
552                         printf(" %*s%-22s%*s%10u\n", 
553                                preflen?4:0, "",
554                                fields[i].name+preflen, 
555                                preflen?0:4, "",
556                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
557                 }
558                 printf(" hop_count_buckets:");
559                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
560                         printf(" %d", s->hop_count_bucket[i]);
561                 }
562                 printf("\n");
563                 printf(" lock_buckets:");
564                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
565                         printf(" %d", s->locks.buckets[i]);
566                 }
567                 printf("\n");
568                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
569
570                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
571
572                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
573
574                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
575                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
576         }
577
578         talloc_free(tmp_ctx);
579 }
580
581 /*
582   display remote ctdb statistics combined from all nodes
583  */
584 static int control_statistics_all(struct ctdb_context *ctdb)
585 {
586         int ret, i;
587         struct ctdb_statistics statistics;
588         uint32_t *nodes;
589         uint32_t num_nodes;
590
591         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
592         CTDB_NO_MEMORY(ctdb, nodes);
593         
594         ZERO_STRUCT(statistics);
595
596         for (i=0;i<num_nodes;i++) {
597                 struct ctdb_statistics s1;
598                 int j;
599                 uint32_t *v1 = (uint32_t *)&s1;
600                 uint32_t *v2 = (uint32_t *)&statistics;
601                 uint32_t num_ints = 
602                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
603                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
604                 if (ret != 0) {
605                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
606                         return ret;
607                 }
608                 for (j=0;j<num_ints;j++) {
609                         v2[j] += v1[j];
610                 }
611                 statistics.max_hop_count = 
612                         MAX(statistics.max_hop_count, s1.max_hop_count);
613                 statistics.call_latency.max = 
614                         MAX(statistics.call_latency.max, s1.call_latency.max);
615         }
616         talloc_free(nodes);
617         printf("Gathered statistics for %u nodes\n", num_nodes);
618         show_statistics(&statistics, 1);
619         return 0;
620 }
621
622 /*
623   display remote ctdb statistics
624  */
625 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
626 {
627         int ret;
628         struct ctdb_statistics statistics;
629
630         if (options.pnn == CTDB_BROADCAST_ALL) {
631                 return control_statistics_all(ctdb);
632         }
633
634         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
635         if (ret != 0) {
636                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
637                 return ret;
638         }
639         show_statistics(&statistics, 1);
640         return 0;
641 }
642
643
644 /*
645   reset remote ctdb statistics
646  */
647 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
648 {
649         int ret;
650
651         ret = ctdb_statistics_reset(ctdb, options.pnn);
652         if (ret != 0) {
653                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
654                 return ret;
655         }
656         return 0;
657 }
658
659
660 /*
661   display remote ctdb rolling statistics
662  */
663 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
664 {
665         int ret;
666         struct ctdb_statistics_wire *stats;
667         int i, num_records = -1;
668
669         assert_single_node_only();
670
671         if (argc ==1) {
672                 num_records = atoi(argv[0]) - 1;
673         }
674
675         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
678                 return ret;
679         }
680         for (i=0;i<stats->num;i++) {
681                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
682                         continue;
683                 }
684                 show_statistics(&stats->stats[i], i==0);
685                 if (i == num_records) {
686                         break;
687                 }
688         }
689         return 0;
690 }
691
692
693 /*
694   display remote ctdb db statistics
695  */
696 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
697 {
698         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
699         struct ctdb_db_statistics *dbstat;
700         int i;
701         uint32_t db_id;
702         int num_hot_keys;
703         int ret;
704
705         if (argc < 1) {
706                 usage();
707         }
708
709         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
710                 return -1;
711         }
712
713         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
714         if (ret != 0) {
715                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
716                 talloc_free(tmp_ctx);
717                 return -1;
718         }
719
720         printf("DB Statistics: %s\n", argv[0]);
721         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
722                 dbstat->db_ro_delegations);
723         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
724                 dbstat->db_ro_delegations);
725         printf(" %s\n", "locks");
726         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
727                 dbstat->locks.num_calls);
728         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
729                 dbstat->locks.num_failed);
730         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
731                 dbstat->locks.num_current);
732         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
733                 dbstat->locks.num_pending);
734         printf(" %s", "hop_count_buckets:");
735         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
736                 printf(" %d", dbstat->hop_count_bucket[i]);
737         }
738         printf("\n");
739         printf(" %s", "lock_buckets:");
740         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
741                 printf(" %d", dbstat->locks.buckets[i]);
742         }
743         printf("\n");
744         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
745                 "locks_latency      MIN/AVG/MAX",
746                 dbstat->locks.latency.min,
747                 (dbstat->locks.latency.num ?
748                  dbstat->locks.latency.total /dbstat->locks.latency.num :
749                  0.0),
750                 dbstat->locks.latency.max,
751                 dbstat->locks.latency.num);
752         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
753                 "vacuum_latency     MIN/AVG/MAX",
754                 dbstat->vacuum.latency.min,
755                 (dbstat->vacuum.latency.num ?
756                  dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
757                  0.0),
758                 dbstat->vacuum.latency.max,
759                 dbstat->vacuum.latency.num);
760         num_hot_keys = 0;
761         for (i=0; i<dbstat->num_hot_keys; i++) {
762                 if (dbstat->hot_keys[i].count > 0) {
763                         num_hot_keys++;
764                 }
765         }
766         dbstat->num_hot_keys = num_hot_keys;
767
768         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
769         for (i = 0; i < dbstat->num_hot_keys; i++) {
770                 int j;
771                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
772                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
773                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
774                 }
775                 printf("\n");
776         }
777
778         talloc_free(tmp_ctx);
779         return 0;
780 }
781
782 /*
783   display uptime of remote node
784  */
785 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
786 {
787         int ret;
788         struct ctdb_uptime *uptime = NULL;
789         int tmp, days, hours, minutes, seconds;
790
791         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
792         if (ret != 0) {
793                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
794                 return ret;
795         }
796
797         if (options.machinereadable){
798                 printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
799                 printm(":%u:%u:%u:%lf\n",
800                         (unsigned int)uptime->current_time.tv_sec,
801                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
802                         (unsigned int)uptime->last_recovery_finished.tv_sec,
803                         timeval_delta(&uptime->last_recovery_finished,
804                                       &uptime->last_recovery_started)
805                 );
806                 return 0;
807         }
808
809         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
810
811         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
812         seconds = tmp%60;
813         tmp    /= 60;
814         minutes = tmp%60;
815         tmp    /= 60;
816         hours   = tmp%24;
817         tmp    /= 24;
818         days    = tmp;
819         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
820
821         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
822         seconds = tmp%60;
823         tmp    /= 60;
824         minutes = tmp%60;
825         tmp    /= 60;
826         hours   = tmp%24;
827         tmp    /= 24;
828         days    = tmp;
829         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
830         
831         printf("Duration of last recovery/failover: %lf seconds\n",
832                 timeval_delta(&uptime->last_recovery_finished,
833                               &uptime->last_recovery_started));
834
835         return 0;
836 }
837
838 /*
839   show the PNN of the current node
840  */
841 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
842 {
843         uint32_t mypnn;
844
845         mypnn = getpnn(ctdb);
846
847         printf("PNN:%d\n", mypnn);
848         return 0;
849 }
850
851
852 struct pnn_node {
853         struct pnn_node *next, *prev;
854         ctdb_sock_addr addr;
855         int pnn;
856 };
857
858 static struct pnn_node *read_pnn_node_file(TALLOC_CTX *mem_ctx,
859                                            const char *file)
860 {
861         int nlines;
862         char **lines;
863         int i, pnn;
864         struct pnn_node *pnn_nodes = NULL;
865         struct pnn_node *pnn_node;
866
867         lines = file_lines_load(file, &nlines, 0, mem_ctx);
868         if (lines == NULL) {
869                 return NULL;
870         }
871         for (i=0, pnn=0; i<nlines; i++) {
872                 char *node;
873
874                 node = lines[i];
875                 /* strip leading spaces */
876                 while((*node == ' ') || (*node == '\t')) {
877                         node++;
878                 }
879                 if (*node == '#') {
880                         pnn++;
881                         continue;
882                 }
883                 if (strcmp(node, "") == 0) {
884                         continue;
885                 }
886                 pnn_node = talloc(mem_ctx, struct pnn_node);
887                 pnn_node->pnn = pnn++;
888
889                 if (!parse_ip(node, NULL, 0, &pnn_node->addr)) {
890                         DEBUG(DEBUG_ERR,
891                               ("Invalid IP address '%s' in file %s\n",
892                                node, file));
893                         /* Caller will free mem_ctx */
894                         return NULL;
895                 }
896
897                 DLIST_ADD_END(pnn_nodes, pnn_node, NULL);
898         }
899
900         return pnn_nodes;
901 }
902
903 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx)
904 {
905         const char *nodes_list;
906
907         /* read the nodes file */
908         nodes_list = getenv("CTDB_NODES");
909         if (nodes_list == NULL) {
910                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
911                                              getenv("CTDB_BASE"));
912                 if (nodes_list == NULL) {
913                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
914                         exit(1);
915                 }
916         }
917
918         return ctdb_read_nodes_file(mem_ctx, nodes_list);
919 }
920
921 /*
922   show the PNN of the current node
923   discover the pnn by loading the nodes file and try to bind to all
924   addresses one at a time until the ip address is found.
925  */
926 static int find_node_xpnn(void)
927 {
928         TALLOC_CTX *mem_ctx = talloc_new(NULL);
929         struct ctdb_node_map *node_map;
930         int i, pnn;
931
932         node_map = read_nodes_file(mem_ctx);
933         if (node_map == NULL) {
934                 talloc_free(mem_ctx);
935                 return -1;
936         }
937
938         for (i = 0; i < node_map->num; i++) {
939                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
940                         continue;
941                 }
942                 if (ctdb_sys_have_ip(&node_map->nodes[i].addr)) {
943                         pnn = node_map->nodes[i].pnn;
944                         talloc_free(mem_ctx);
945                         return pnn;
946                 }
947         }
948
949         printf("Failed to detect which PNN this node is\n");
950         talloc_free(mem_ctx);
951         return -1;
952 }
953
954 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
955 {
956         uint32_t pnn;
957
958         assert_single_node_only();
959
960         pnn = find_node_xpnn();
961         if (pnn == -1) {
962                 return -1;
963         }
964
965         printf("PNN:%d\n", pnn);
966         return 0;
967 }
968
969 /* Helpers for ctdb status
970  */
971 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
972 {
973         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
974         int j;
975         bool ret = false;
976
977         if (node->flags == 0) {
978                 struct ctdb_control_get_ifaces *ifaces;
979
980                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
981                                          tmp_ctx, &ifaces) == 0) {
982                         for (j=0; j < ifaces->num; j++) {
983                                 if (ifaces->ifaces[j].link_state != 0) {
984                                         continue;
985                                 }
986                                 ret = true;
987                                 break;
988                         }
989                 }
990         }
991         talloc_free(tmp_ctx);
992
993         return ret;
994 }
995
996 static void control_status_header_machine(void)
997 {
998         printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
999                ":Inactive:PartiallyOnline:ThisNode:\n");
1000 }
1001
1002 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
1003                                     struct ctdb_node_and_flags *node)
1004 {
1005         printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
1006                ctdb_addr_to_str(&node->addr),
1007                !!(node->flags&NODE_FLAGS_DISCONNECTED),
1008                !!(node->flags&NODE_FLAGS_BANNED),
1009                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
1010                !!(node->flags&NODE_FLAGS_UNHEALTHY),
1011                !!(node->flags&NODE_FLAGS_STOPPED),
1012                !!(node->flags&NODE_FLAGS_INACTIVE),
1013                is_partially_online(ctdb, node) ? 1 : 0,
1014                (node->pnn == mypnn)?'Y':'N');
1015
1016         return node->flags;
1017 }
1018
1019 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
1020                                   struct ctdb_node_and_flags *node)
1021 {
1022        printf("pnn:%d %-16s %s%s\n", node->pnn,
1023               ctdb_addr_to_str(&node->addr),
1024               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
1025               node->pnn == mypnn?" (THIS NODE)":"");
1026
1027        return node->flags;
1028 }
1029
1030 /*
1031   display remote ctdb status
1032  */
1033 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
1034 {
1035         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1036         int i;
1037         struct ctdb_vnn_map *vnnmap=NULL;
1038         struct ctdb_node_map *nodemap=NULL;
1039         uint32_t recmode, recmaster, mypnn;
1040         int num_deleted_nodes = 0;
1041         int ret;
1042
1043         mypnn = getpnn(ctdb);
1044
1045         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1046         if (ret != 0) {
1047                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1048                 talloc_free(tmp_ctx);
1049                 return -1;
1050         }
1051
1052         if (options.machinereadable) {
1053                 control_status_header_machine();
1054                 for (i=0;i<nodemap->num;i++) {
1055                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1056                                 continue;
1057                         }
1058                         (void) control_status_1_machine(ctdb, mypnn,
1059                                                         &nodemap->nodes[i]);
1060                 }
1061                 talloc_free(tmp_ctx);
1062                 return 0;
1063         }
1064
1065         for (i=0; i<nodemap->num; i++) {
1066                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1067                         num_deleted_nodes++;
1068                 }
1069         }
1070         if (num_deleted_nodes == 0) {
1071                 printf("Number of nodes:%d\n", nodemap->num);
1072         } else {
1073                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1074                        nodemap->num, num_deleted_nodes);
1075         }
1076         for(i=0;i<nodemap->num;i++){
1077                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1078                         continue;
1079                 }
1080                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1081         }
1082
1083         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1084         if (ret != 0) {
1085                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1086                 talloc_free(tmp_ctx);
1087                 return -1;
1088         }
1089         if (vnnmap->generation == INVALID_GENERATION) {
1090                 printf("Generation:INVALID\n");
1091         } else {
1092                 printf("Generation:%d\n",vnnmap->generation);
1093         }
1094         printf("Size:%d\n",vnnmap->size);
1095         for(i=0;i<vnnmap->size;i++){
1096                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1097         }
1098
1099         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1100         if (ret != 0) {
1101                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1102                 talloc_free(tmp_ctx);
1103                 return -1;
1104         }
1105         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1106
1107         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1108         if (ret != 0) {
1109                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1110                 talloc_free(tmp_ctx);
1111                 return -1;
1112         }
1113         printf("Recovery master:%d\n",recmaster);
1114
1115         talloc_free(tmp_ctx);
1116         return 0;
1117 }
1118
1119 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1120 {
1121         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1122         int i, ret;
1123         struct ctdb_node_map *nodemap=NULL;
1124         uint32_t * nodes;
1125         uint32_t pnn_mode, mypnn;
1126
1127         if (argc > 1) {
1128                 usage();
1129         }
1130
1131         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1132                               options.pnn, true, &nodes, &pnn_mode)) {
1133                 return -1;
1134         }
1135
1136         if (options.machinereadable) {
1137                 control_status_header_machine();
1138         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1139                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1140         }
1141
1142         mypnn = getpnn(ctdb);
1143
1144         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1145         if (ret != 0) {
1146                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1147                 talloc_free(tmp_ctx);
1148                 return -1;
1149         }
1150
1151         ret = 0;
1152
1153         for (i = 0; i < talloc_array_length(nodes); i++) {
1154                 if (options.machinereadable) {
1155                         ret |= control_status_1_machine(ctdb, mypnn,
1156                                                         &nodemap->nodes[nodes[i]]);
1157                 } else {
1158                         ret |= control_status_1_human(ctdb, mypnn,
1159                                                       &nodemap->nodes[nodes[i]]);
1160                 }
1161         }
1162
1163         talloc_free(tmp_ctx);
1164         return ret;
1165 }
1166
1167 static struct pnn_node *read_natgw_nodes_file(struct ctdb_context *ctdb,
1168                                               TALLOC_CTX *mem_ctx)
1169 {
1170         const char *natgw_list;
1171         struct pnn_node *natgw_nodes = NULL;
1172
1173         natgw_list = getenv("CTDB_NATGW_NODES");
1174         if (natgw_list == NULL) {
1175                 natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
1176                                              getenv("CTDB_BASE"));
1177                 if (natgw_list == NULL) {
1178                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1179                         exit(1);
1180                 }
1181         }
1182         /* The PNNs will be junk but they're not used */
1183         natgw_nodes = read_pnn_node_file(mem_ctx, natgw_list);
1184         if (natgw_nodes == NULL) {
1185                 DEBUG(DEBUG_ERR,
1186                       ("Failed to load natgw node list '%s'\n", natgw_list));
1187         }
1188         return natgw_nodes;
1189 }
1190
1191
1192 /* talloc off the existing nodemap... */
1193 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
1194 {
1195         return talloc_zero_size(nodemap,
1196                                 offsetof(struct ctdb_node_map, nodes) +
1197                                 nodemap->num * sizeof(struct ctdb_node_and_flags));
1198 }
1199
1200 static struct ctdb_node_map *
1201 filter_nodemap_by_addrs(struct ctdb_context *ctdb,
1202                         struct ctdb_node_map *nodemap,
1203                         struct pnn_node *nodes)
1204 {
1205         int i;
1206         struct pnn_node *n;
1207         struct ctdb_node_map *ret;
1208
1209         ret = talloc_nodemap(nodemap);
1210         CTDB_NO_MEMORY_NULL(ctdb, ret);
1211
1212         ret->num = 0;
1213
1214         for (i = 0; i < nodemap->num; i++) {
1215                 for(n = nodes; n != NULL ; n = n->next) {
1216                         if (ctdb_same_ip(&n->addr,
1217                                          &nodemap->nodes[i].addr)) {
1218                                 break;
1219                         }
1220                 }
1221                 if (n == NULL) {
1222                         continue;
1223                 }
1224
1225                 ret->nodes[ret->num] = nodemap->nodes[i];
1226                 ret->num++;
1227         }
1228
1229         return ret;
1230 }
1231
1232 static struct ctdb_node_map *
1233 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
1234                                struct ctdb_node_map *nodemap,
1235                                uint32_t required_capabilities,
1236                                bool first_only)
1237 {
1238         int i;
1239         uint32_t capabilities;
1240         struct ctdb_node_map *ret;
1241
1242         ret = talloc_nodemap(nodemap);
1243         CTDB_NO_MEMORY_NULL(ctdb, ret);
1244
1245         ret->num = 0;
1246
1247         for (i = 0; i < nodemap->num; i++) {
1248                 int res;
1249
1250                 /* Disconnected nodes have no capabilities! */
1251                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1252                         continue;
1253                 }
1254
1255                 res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1256                                                 nodemap->nodes[i].pnn,
1257                                                 &capabilities);
1258                 if (res != 0) {
1259                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1260                                           nodemap->nodes[i].pnn));
1261                         talloc_free(ret);
1262                         return NULL;
1263                 }
1264                 if (!(capabilities & required_capabilities)) {
1265                         continue;
1266                 }
1267
1268                 ret->nodes[ret->num] = nodemap->nodes[i];
1269                 ret->num++;
1270                 if (first_only) {
1271                         break;
1272                 }
1273         }
1274
1275         return ret;
1276 }
1277
1278 static struct ctdb_node_map *
1279 filter_nodemap_by_flags(struct ctdb_context *ctdb,
1280                         struct ctdb_node_map *nodemap,
1281                         uint32_t flags_mask)
1282 {
1283         int i;
1284         struct ctdb_node_map *ret;
1285
1286         ret = talloc_nodemap(nodemap);
1287         CTDB_NO_MEMORY_NULL(ctdb, ret);
1288
1289         ret->num = 0;
1290
1291         for (i = 0; i < nodemap->num; i++) {
1292                 if (nodemap->nodes[i].flags & flags_mask) {
1293                         continue;
1294                 }
1295
1296                 ret->nodes[ret->num] = nodemap->nodes[i];
1297                 ret->num++;
1298         }
1299
1300         return ret;
1301 }
1302
1303 /*
1304   display the list of nodes belonging to this natgw configuration
1305  */
1306 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1307 {
1308         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1309         int i, ret;
1310         struct pnn_node *natgw_nodes = NULL;
1311         struct ctdb_node_map *orig_nodemap=NULL;
1312         struct ctdb_node_map *nodemap;
1313         uint32_t mypnn, pnn;
1314         const char *ip;
1315
1316         /* When we have some nodes that could be the NATGW, make a
1317          * series of attempts to find the first node that doesn't have
1318          * certain status flags set.
1319          */
1320         uint32_t exclude_flags[] = {
1321                 /* Look for a nice healthy node */
1322                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1323                 /* If not found, an UNHEALTHY/BANNED node will do */
1324                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1325                 /* If not found, a STOPPED node will do */
1326                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1327                 0,
1328         };
1329
1330         /* read the natgw nodes file into a linked list */
1331         natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
1332         if (natgw_nodes == NULL) {
1333                 ret = -1;
1334                 goto done;
1335         }
1336
1337         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
1338                                    tmp_ctx, &orig_nodemap);
1339         if (ret != 0) {
1340                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1341                 talloc_free(tmp_ctx);
1342                 return -1;
1343         }
1344
1345         /* Get a nodemap that includes only the nodes in the NATGW
1346          * group */
1347         nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
1348         if (nodemap == NULL) {
1349                 ret = -1;
1350                 goto done;
1351         }
1352
1353         ret = 2; /* matches ENOENT */
1354         pnn = -1;
1355         ip = "0.0.0.0";
1356         /* For each flag mask... */
1357         for (i = 0; exclude_flags[i] != 0; i++) {
1358                 /* ... get a nodemap that excludes nodes with with
1359                  * masked flags... */
1360                 struct ctdb_node_map *t =
1361                         filter_nodemap_by_flags(ctdb, nodemap,
1362                                                 exclude_flags[i]);
1363                 if (t == NULL) {
1364                         /* No memory */
1365                         ret = -1;
1366                         goto done;
1367                 }
1368                 if (t->num > 0) {
1369                         /* ... and find the first node with the NATGW
1370                          * capability */
1371                         struct ctdb_node_map *n;
1372                         n = filter_nodemap_by_capabilities(ctdb, t,
1373                                                            CTDB_CAP_NATGW,
1374                                                            true);
1375                         if (n == NULL) {
1376                                 /* No memory */
1377                                 ret = -1;
1378                                 goto done;
1379                         }
1380                         if (n->num > 0) {
1381                                 ret = 0;
1382                                 pnn = n->nodes[0].pnn;
1383                                 ip = ctdb_addr_to_str(&n->nodes[0].addr);
1384                                 break;
1385                         }
1386                 }
1387                 talloc_free(t);
1388         }
1389
1390         if (options.machinereadable) {
1391                 printm(":Node:IP:\n");
1392                 printm(":%d:%s:\n", pnn, ip);
1393         } else {
1394                 printf("%d %s\n", pnn, ip);
1395         }
1396
1397         /* print the pruned list of nodes belonging to this natgw list */
1398         mypnn = getpnn(ctdb);
1399         if (options.machinereadable) {
1400                 control_status_header_machine();
1401         } else {
1402                 printf("Number of nodes:%d\n", nodemap->num);
1403         }
1404         for(i=0;i<nodemap->num;i++){
1405                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1406                         continue;
1407                 }
1408                 if (options.machinereadable) {
1409                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1410                 } else {
1411                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1412                 }
1413         }
1414
1415 done:
1416         talloc_free(tmp_ctx);
1417         return ret;
1418 }
1419
1420 /*
1421   display the status of the scripts for monitoring (or other events)
1422  */
1423 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1424                                     enum ctdb_eventscript_call type)
1425 {
1426         struct ctdb_scripts_wire *script_status;
1427         int ret, i;
1428
1429         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1430         if (ret != 0) {
1431                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1432                 return ret;
1433         }
1434
1435         if (script_status == NULL) {
1436                 if (!options.machinereadable) {
1437                         printf("%s cycle never run\n",
1438                                ctdb_eventscript_call_names[type]);
1439                 }
1440                 return 0;
1441         }
1442
1443         if (!options.machinereadable) {
1444                 int num_run = 0;
1445                 for (i=0; i<script_status->num_scripts; i++) {
1446                         if (script_status->scripts[i].status != -ENOEXEC) {
1447                                 num_run++;
1448                         }
1449                 }
1450                 printf("%d scripts were executed last %s cycle\n",
1451                        num_run,
1452                        ctdb_eventscript_call_names[type]);
1453         }
1454         for (i=0; i<script_status->num_scripts; i++) {
1455                 const char *status = NULL;
1456
1457                 switch (script_status->scripts[i].status) {
1458                 case -ETIME:
1459                         status = "TIMEDOUT";
1460                         break;
1461                 case -ENOEXEC:
1462                         status = "DISABLED";
1463                         break;
1464                 case 0:
1465                         status = "OK";
1466                         break;
1467                 default:
1468                         if (script_status->scripts[i].status > 0)
1469                                 status = "ERROR";
1470                         break;
1471                 }
1472                 if (options.machinereadable) {
1473                         printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1474                                ctdb_eventscript_call_names[type],
1475                                script_status->scripts[i].name,
1476                                script_status->scripts[i].status,
1477                                status,
1478                                (long)script_status->scripts[i].start.tv_sec,
1479                                (long)script_status->scripts[i].start.tv_usec,
1480                                (long)script_status->scripts[i].finished.tv_sec,
1481                                (long)script_status->scripts[i].finished.tv_usec,
1482                                script_status->scripts[i].output);
1483                         continue;
1484                 }
1485                 if (status)
1486                         printf("%-20s Status:%s    ",
1487                                script_status->scripts[i].name, status);
1488                 else
1489                         /* Some other error, eg from stat. */
1490                         printf("%-20s Status:CANNOT RUN (%s)",
1491                                script_status->scripts[i].name,
1492                                strerror(-script_status->scripts[i].status));
1493
1494                 if (script_status->scripts[i].status >= 0) {
1495                         printf("Duration:%.3lf ",
1496                         timeval_delta(&script_status->scripts[i].finished,
1497                               &script_status->scripts[i].start));
1498                 }
1499                 if (script_status->scripts[i].status != -ENOEXEC) {
1500                         printf("%s",
1501                                ctime(&script_status->scripts[i].start.tv_sec));
1502                         if (script_status->scripts[i].status != 0) {
1503                                 printf("   OUTPUT:%s\n",
1504                                        script_status->scripts[i].output);
1505                         }
1506                 } else {
1507                         printf("\n");
1508                 }
1509         }
1510         return 0;
1511 }
1512
1513
1514 static int control_scriptstatus(struct ctdb_context *ctdb,
1515                                 int argc, const char **argv)
1516 {
1517         int ret;
1518         enum ctdb_eventscript_call type, min, max;
1519         const char *arg;
1520
1521         if (argc > 1) {
1522                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1523                 return -1;
1524         }
1525
1526         if (argc == 0)
1527                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1528         else
1529                 arg = argv[0];
1530
1531         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1532                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1533                         min = type;
1534                         max = type+1;
1535                         break;
1536                 }
1537         }
1538         if (type == CTDB_EVENT_MAX) {
1539                 if (strcmp(arg, "all") == 0) {
1540                         min = 0;
1541                         max = CTDB_EVENT_MAX;
1542                 } else {
1543                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1544                         return -1;
1545                 }
1546         }
1547
1548         if (options.machinereadable) {
1549                 printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1550         }
1551
1552         for (type = min; type < max; type++) {
1553                 ret = control_one_scriptstatus(ctdb, type);
1554                 if (ret != 0) {
1555                         return ret;
1556                 }
1557         }
1558
1559         return 0;
1560 }
1561
1562 /*
1563   enable an eventscript
1564  */
1565 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1566 {
1567         int ret;
1568
1569         if (argc < 1) {
1570                 usage();
1571         }
1572
1573         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1574         if (ret != 0) {
1575           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1576                 return ret;
1577         }
1578
1579         return 0;
1580 }
1581
1582 /*
1583   disable an eventscript
1584  */
1585 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1586 {
1587         int ret;
1588
1589         if (argc < 1) {
1590                 usage();
1591         }
1592
1593         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1594         if (ret != 0) {
1595           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1596                 return ret;
1597         }
1598
1599         return 0;
1600 }
1601
1602 /*
1603   display the pnn of the recovery master
1604  */
1605 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1606 {
1607         uint32_t recmaster;
1608         int ret;
1609
1610         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1611         if (ret != 0) {
1612                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1613                 return -1;
1614         }
1615         printf("%d\n",recmaster);
1616
1617         return 0;
1618 }
1619
1620 /*
1621   add a tickle to a public address
1622  */
1623 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1624 {
1625         struct ctdb_tcp_connection t;
1626         TDB_DATA data;
1627         int ret;
1628
1629         assert_single_node_only();
1630
1631         if (argc < 2) {
1632                 usage();
1633         }
1634
1635         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1636                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1637                 return -1;
1638         }
1639         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1640                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1641                 return -1;
1642         }
1643
1644         data.dptr = (uint8_t *)&t;
1645         data.dsize = sizeof(t);
1646
1647         /* tell all nodes about this tcp connection */
1648         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1649                            0, data, ctdb, NULL, NULL, NULL, NULL);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1652                 return -1;
1653         }
1654         
1655         return 0;
1656 }
1657
1658
1659 /*
1660   delete a tickle from a node
1661  */
1662 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1663 {
1664         struct ctdb_tcp_connection t;
1665         TDB_DATA data;
1666         int ret;
1667
1668         assert_single_node_only();
1669
1670         if (argc < 2) {
1671                 usage();
1672         }
1673
1674         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1675                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1676                 return -1;
1677         }
1678         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1679                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1680                 return -1;
1681         }
1682
1683         data.dptr = (uint8_t *)&t;
1684         data.dsize = sizeof(t);
1685
1686         /* tell all nodes about this tcp connection */
1687         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1688                            0, data, ctdb, NULL, NULL, NULL, NULL);
1689         if (ret != 0) {
1690                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1691                 return -1;
1692         }
1693         
1694         return 0;
1695 }
1696
1697
1698 /*
1699   get a list of all tickles for this pnn
1700  */
1701 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1702 {
1703         struct ctdb_control_tcp_tickle_list *list;
1704         ctdb_sock_addr addr;
1705         int i, ret;
1706         unsigned port = 0;
1707
1708         assert_single_node_only();
1709
1710         if (argc < 1) {
1711                 usage();
1712         }
1713
1714         if (argc == 2) {
1715                 port = atoi(argv[1]);
1716         }
1717
1718         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1719                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1720                 return -1;
1721         }
1722
1723         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1724         if (ret == -1) {
1725                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1726                 return -1;
1727         }
1728
1729         if (options.machinereadable){
1730                 printm(":source ip:port:destination ip:port:\n");
1731                 for (i=0;i<list->tickles.num;i++) {
1732                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1733                                 continue;
1734                         }
1735                         printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1736                         printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1737                 }
1738         } else {
1739                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1740                 printf("Num tickles:%u\n", list->tickles.num);
1741                 for (i=0;i<list->tickles.num;i++) {
1742                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1743                                 continue;
1744                         }
1745                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1746                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1747                 }
1748         }
1749
1750         talloc_free(list);
1751         
1752         return 0;
1753 }
1754
1755
1756 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1757 {
1758         struct ctdb_all_public_ips *ips;
1759         struct ctdb_public_ip ip;
1760         int i, ret;
1761         uint32_t *nodes;
1762         uint32_t disable_time;
1763         TDB_DATA data;
1764         struct ctdb_node_map *nodemap=NULL;
1765         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1766
1767         disable_time = 30;
1768         data.dptr  = (uint8_t*)&disable_time;
1769         data.dsize = sizeof(disable_time);
1770         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1771         if (ret != 0) {
1772                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1773                 return -1;
1774         }
1775
1776
1777
1778         /* read the public ip list from the node */
1779         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1780         if (ret != 0) {
1781                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1782                 talloc_free(tmp_ctx);
1783                 return -1;
1784         }
1785
1786         for (i=0;i<ips->num;i++) {
1787                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1788                         break;
1789                 }
1790         }
1791         if (i==ips->num) {
1792                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1793                         pnn, ctdb_addr_to_str(addr)));
1794                 talloc_free(tmp_ctx);
1795                 return -1;
1796         }
1797
1798         ip.pnn  = pnn;
1799         ip.addr = *addr;
1800
1801         data.dptr  = (uint8_t *)&ip;
1802         data.dsize = sizeof(ip);
1803
1804         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1805         if (ret != 0) {
1806                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1807                 talloc_free(tmp_ctx);
1808                 return ret;
1809         }
1810
1811         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1812         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1813                                         nodes, 0,
1814                                         LONGTIMELIMIT(),
1815                                         false, data,
1816                                         NULL, NULL,
1817                                         NULL);
1818         if (ret != 0) {
1819                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1820                 talloc_free(tmp_ctx);
1821                 return -1;
1822         }
1823
1824         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1825         if (ret != 0) {
1826                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1827                 talloc_free(tmp_ctx);
1828                 return -1;
1829         }
1830
1831         /* update the recovery daemon so it now knows to expect the new
1832            node assignment for this ip.
1833         */
1834         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1837                 return -1;
1838         }
1839
1840         talloc_free(tmp_ctx);
1841         return 0;
1842 }
1843
1844
1845 /* 
1846  * scans all other nodes and returns a pnn for another node that can host this 
1847  * ip address or -1
1848  */
1849 static int
1850 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1851 {
1852         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1853         struct ctdb_all_public_ips *ips;
1854         struct ctdb_node_map *nodemap=NULL;
1855         int i, j, ret;
1856         int pnn;
1857
1858         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1859         if (ret != 0) {
1860                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1861                 talloc_free(tmp_ctx);
1862                 return ret;
1863         }
1864
1865         for(i=0;i<nodemap->num;i++){
1866                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1867                         continue;
1868                 }
1869                 if (nodemap->nodes[i].pnn == options.pnn) {
1870                         continue;
1871                 }
1872
1873                 /* read the public ip list from this node */
1874                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1875                 if (ret != 0) {
1876                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1877                         return -1;
1878                 }
1879
1880                 for (j=0;j<ips->num;j++) {
1881                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1882                                 pnn = nodemap->nodes[i].pnn;
1883                                 talloc_free(tmp_ctx);
1884                                 return pnn;
1885                         }
1886                 }
1887                 talloc_free(ips);
1888         }
1889
1890         talloc_free(tmp_ctx);
1891         return -1;
1892 }
1893
1894 /* If pnn is -1 then try to find a node to move IP to... */
1895 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1896 {
1897         bool pnn_specified = (pnn == -1 ? false : true);
1898         int retries = 0;
1899
1900         while (retries < 5) {
1901                 if (!pnn_specified) {
1902                         pnn = find_other_host_for_public_ip(ctdb, addr);
1903                         if (pnn == -1) {
1904                                 return false;
1905                         }
1906                         DEBUG(DEBUG_NOTICE,
1907                               ("Trying to move public IP to node %u\n", pnn));
1908                 }
1909
1910                 if (move_ip(ctdb, addr, pnn) == 0) {
1911                         return true;
1912                 }
1913
1914                 sleep(3);
1915                 retries++;
1916         }
1917
1918         return false;
1919 }
1920
1921
1922 /*
1923   move/failover an ip address to a specific node
1924  */
1925 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1926 {
1927         uint32_t pnn;
1928         ctdb_sock_addr addr;
1929
1930         assert_single_node_only();
1931
1932         if (argc < 2) {
1933                 usage();
1934                 return -1;
1935         }
1936
1937         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1938                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1939                 return -1;
1940         }
1941
1942
1943         if (sscanf(argv[1], "%u", &pnn) != 1) {
1944                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1945                 return -1;
1946         }
1947
1948         if (!try_moveip(ctdb, &addr, pnn)) {
1949                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1950                 return -1;
1951         }
1952
1953         return 0;
1954 }
1955
1956 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1957 {
1958         TDB_DATA data;
1959
1960         data.dptr  = (uint8_t *)&pnn;
1961         data.dsize = sizeof(uint32_t);
1962         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1963                 DEBUG(DEBUG_ERR,
1964                       ("Failed to send message to force node %u to be a rebalancing target\n",
1965                        pnn));
1966                 return -1;
1967         }
1968
1969         return 0;
1970 }
1971
1972
1973 /*
1974   rebalance a node by setting it to allow failback and triggering a
1975   takeover run
1976  */
1977 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1978 {
1979         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1980         uint32_t *nodes;
1981         uint32_t pnn_mode;
1982         int i, ret;
1983
1984         assert_single_node_only();
1985
1986         if (argc > 1) {
1987                 usage();
1988         }
1989
1990         /* Determine the nodes where IPs need to be reloaded */
1991         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1992                               options.pnn, true, &nodes, &pnn_mode)) {
1993                 ret = -1;
1994                 goto done;
1995         }
1996
1997         for (i = 0; i < talloc_array_length(nodes); i++) {
1998                 if (!rebalance_node(ctdb, nodes[i])) {
1999                         ret = -1;
2000                 }
2001         }
2002
2003 done:
2004         talloc_free(tmp_ctx);
2005         return ret;
2006 }
2007
2008 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
2009 {
2010         struct ctdb_public_ip ip;
2011         int ret;
2012         uint32_t *nodes;
2013         uint32_t disable_time;
2014         TDB_DATA data;
2015         struct ctdb_node_map *nodemap=NULL;
2016         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2017
2018         disable_time = 30;
2019         data.dptr  = (uint8_t*)&disable_time;
2020         data.dsize = sizeof(disable_time);
2021         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
2022         if (ret != 0) {
2023                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
2024                 return -1;
2025         }
2026
2027         ip.pnn  = -1;
2028         ip.addr = *addr;
2029
2030         data.dptr  = (uint8_t *)&ip;
2031         data.dsize = sizeof(ip);
2032
2033         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
2034         if (ret != 0) {
2035                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2036                 talloc_free(tmp_ctx);
2037                 return ret;
2038         }
2039
2040         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
2041         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
2042                                         nodes, 0,
2043                                         LONGTIMELIMIT(),
2044                                         false, data,
2045                                         NULL, NULL,
2046                                         NULL);
2047         if (ret != 0) {
2048                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
2049                 talloc_free(tmp_ctx);
2050                 return -1;
2051         }
2052
2053         talloc_free(tmp_ctx);
2054         return 0;
2055 }
2056
2057 /*
2058   release an ip form all nodes and have it re-assigned by recd
2059  */
2060 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
2061 {
2062         ctdb_sock_addr addr;
2063
2064         assert_single_node_only();
2065
2066         if (argc < 1) {
2067                 usage();
2068                 return -1;
2069         }
2070
2071         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2072                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2073                 return -1;
2074         }
2075
2076         if (rebalance_ip(ctdb, &addr) != 0) {
2077                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
2078                 return -1;
2079         }
2080
2081         return 0;
2082 }
2083
2084 static int getips_store_callback(void *param, void *data)
2085 {
2086         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
2087         struct ctdb_all_public_ips *ips = param;
2088         int i;
2089
2090         i = ips->num++;
2091         ips->ips[i].pnn  = node_ip->pnn;
2092         ips->ips[i].addr = node_ip->addr;
2093         return 0;
2094 }
2095
2096 static int getips_count_callback(void *param, void *data)
2097 {
2098         uint32_t *count = param;
2099
2100         (*count)++;
2101         return 0;
2102 }
2103
2104 #define IP_KEYLEN       4
2105 static uint32_t *ip_key(ctdb_sock_addr *ip)
2106 {
2107         static uint32_t key[IP_KEYLEN];
2108
2109         bzero(key, sizeof(key));
2110
2111         switch (ip->sa.sa_family) {
2112         case AF_INET:
2113                 key[0]  = ip->ip.sin_addr.s_addr;
2114                 break;
2115         case AF_INET6: {
2116                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
2117                 key[0]  = s6_a32[3];
2118                 key[1]  = s6_a32[2];
2119                 key[2]  = s6_a32[1];
2120                 key[3]  = s6_a32[0];
2121                 break;
2122         }
2123         default:
2124                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
2125                 return key;
2126         }
2127
2128         return key;
2129 }
2130
2131 static void *add_ip_callback(void *parm, void *data)
2132 {
2133         return parm;
2134 }
2135
2136 static int
2137 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2138 {
2139         struct ctdb_all_public_ips *tmp_ips;
2140         struct ctdb_node_map *nodemap=NULL;
2141         trbt_tree_t *ip_tree;
2142         int i, j, len, ret;
2143         uint32_t count;
2144
2145         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2146         if (ret != 0) {
2147                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2148                 return ret;
2149         }
2150
2151         ip_tree = trbt_create(tmp_ctx, 0);
2152
2153         for(i=0;i<nodemap->num;i++){
2154                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2155                         continue;
2156                 }
2157                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2158                         continue;
2159                 }
2160
2161                 /* read the public ip list from this node */
2162                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2163                 if (ret != 0) {
2164                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2165                         return -1;
2166                 }
2167         
2168                 for (j=0; j<tmp_ips->num;j++) {
2169                         struct ctdb_public_ip *node_ip;
2170
2171                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2172                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2173                         node_ip->addr = tmp_ips->ips[j].addr;
2174
2175                         trbt_insertarray32_callback(ip_tree,
2176                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2177                                 add_ip_callback,
2178                                 node_ip);
2179                 }
2180                 talloc_free(tmp_ips);
2181         }
2182
2183         /* traverse */
2184         count = 0;
2185         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2186
2187         len = offsetof(struct ctdb_all_public_ips, ips) + 
2188                 count*sizeof(struct ctdb_public_ip);
2189         tmp_ips = talloc_zero_size(tmp_ctx, len);
2190         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2191
2192         *ips = tmp_ips;
2193
2194         return 0;
2195 }
2196
2197
2198 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2199 {
2200         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2201
2202         event_add_timed(ctdb->ev, ctdb, 
2203                                 timeval_current_ofs(1, 0),
2204                                 ctdb_every_second, ctdb);
2205 }
2206
2207 struct srvid_reply_handler_data {
2208         bool done;
2209         bool wait_for_all;
2210         uint32_t *nodes;
2211         const char *srvid_str;
2212 };
2213
2214 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2215                                          uint64_t srvid,
2216                                          TDB_DATA data,
2217                                          void *private_data)
2218 {
2219         struct srvid_reply_handler_data *d =
2220                 (struct srvid_reply_handler_data *)private_data;
2221         int i;
2222         int32_t ret;
2223
2224         if (data.dsize != sizeof(ret)) {
2225                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2226                 return;
2227         }
2228
2229         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2230         ret = *(int32_t *)data.dptr;
2231         if (ret < 0) {
2232                 DEBUG(DEBUG_ERR,
2233                       ("%s failed with result %d\n", d->srvid_str, ret));
2234                 return;
2235         }
2236
2237         if (!d->wait_for_all) {
2238                 d->done = true;
2239                 return;
2240         }
2241
2242         /* Wait for all replies */
2243         d->done = true;
2244         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2245                 if (d->nodes[i] == ret) {
2246                         DEBUG(DEBUG_DEBUG,
2247                               ("%s reply received from node %u\n",
2248                                d->srvid_str, ret));
2249                         d->nodes[i] = -1;
2250                 }
2251                 if (d->nodes[i] != -1) {
2252                         /* Found a node that hasn't yet replied */
2253                         d->done = false;
2254                 }
2255         }
2256 }
2257
2258 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2259  * or replies from all connected nodes.  arg is the data argument to
2260  * pass in the srvid_request structure - pass 0 if this isn't needed.
2261  */
2262 static int srvid_broadcast(struct ctdb_context *ctdb,
2263                            uint64_t srvid, uint32_t *arg,
2264                            const char *srvid_str, bool wait_for_all)
2265 {
2266         int ret;
2267         TDB_DATA data;
2268         uint32_t pnn;
2269         uint64_t reply_srvid;
2270         struct srvid_request request;
2271         struct srvid_request_data request_data;
2272         struct srvid_reply_handler_data reply_data;
2273         struct timeval tv;
2274
2275         ZERO_STRUCT(request);
2276
2277         /* Time ticks to enable timeouts to be processed */
2278         event_add_timed(ctdb->ev, ctdb, 
2279                                 timeval_current_ofs(1, 0),
2280                                 ctdb_every_second, ctdb);
2281
2282         pnn = ctdb_get_pnn(ctdb);
2283         reply_srvid = getpid();
2284
2285         if (arg == NULL) {
2286                 request.pnn = pnn;
2287                 request.srvid = reply_srvid;
2288
2289                 data.dptr = (uint8_t *)&request;
2290                 data.dsize = sizeof(request);
2291         } else {
2292                 request_data.pnn = pnn;
2293                 request_data.srvid = reply_srvid;
2294                 request_data.data = *arg;
2295
2296                 data.dptr = (uint8_t *)&request_data;
2297                 data.dsize = sizeof(request_data);
2298         }
2299
2300         /* Register message port for reply from recovery master */
2301         ctdb_client_set_message_handler(ctdb, reply_srvid,
2302                                         srvid_broadcast_reply_handler,
2303                                         &reply_data);
2304
2305         reply_data.wait_for_all = wait_for_all;
2306         reply_data.nodes = NULL;
2307         reply_data.srvid_str = srvid_str;
2308
2309 again:
2310         reply_data.done = false;
2311
2312         if (wait_for_all) {
2313                 struct ctdb_node_map *nodemap;
2314
2315                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2316                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2317                 if (ret != 0) {
2318                         DEBUG(DEBUG_ERR,
2319                               ("Unable to get nodemap from current node, try again\n"));
2320                         sleep(1);
2321                         goto again;
2322                 }
2323
2324                 if (reply_data.nodes != NULL) {
2325                         talloc_free(reply_data.nodes);
2326                 }
2327                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2328                                                            NULL, true);
2329
2330                 talloc_free(nodemap);
2331         }
2332
2333         /* Send to all connected nodes. Only recmaster replies */
2334         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2335                                        srvid, data);
2336         if (ret != 0) {
2337                 /* This can only happen if the socket is closed and
2338                  * there's no way to recover from that, so don't try
2339                  * again.
2340                  */
2341                 DEBUG(DEBUG_ERR,
2342                       ("Failed to send %s request to connected nodes\n",
2343                        srvid_str));
2344                 return -1;
2345         }
2346
2347         tv = timeval_current();
2348         /* This loop terminates the reply is received */
2349         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2350                 event_loop_once(ctdb->ev);
2351         }
2352
2353         if (!reply_data.done) {
2354                 DEBUG(DEBUG_NOTICE,
2355                       ("Still waiting for confirmation of %s\n", srvid_str));
2356                 sleep(1);
2357                 goto again;
2358         }
2359
2360         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2361
2362         talloc_free(reply_data.nodes);
2363
2364         return 0;
2365 }
2366
2367 static int ipreallocate(struct ctdb_context *ctdb)
2368 {
2369         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2370                                "IP reallocation", false);
2371 }
2372
2373
2374 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2375 {
2376         return ipreallocate(ctdb);
2377 }
2378
2379 /*
2380   add a public ip address to a node
2381  */
2382 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2383 {
2384         int i, ret;
2385         int len, retries = 0;
2386         unsigned mask;
2387         ctdb_sock_addr addr;
2388         struct ctdb_control_ip_iface *pub;
2389         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2390         struct ctdb_all_public_ips *ips;
2391
2392
2393         if (argc != 2) {
2394                 talloc_free(tmp_ctx);
2395                 usage();
2396         }
2397
2398         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2399                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2400                 talloc_free(tmp_ctx);
2401                 return -1;
2402         }
2403
2404         /* read the public ip list from the node */
2405         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2406         if (ret != 0) {
2407                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2408                 talloc_free(tmp_ctx);
2409                 return -1;
2410         }
2411         for (i=0;i<ips->num;i++) {
2412                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2413                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2414                         return 0;
2415                 }
2416         }
2417
2418
2419
2420         /* Dont timeout. This command waits for an ip reallocation
2421            which sometimes can take wuite a while if there has
2422            been a recent recovery
2423         */
2424         alarm(0);
2425
2426         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2427         pub = talloc_size(tmp_ctx, len); 
2428         CTDB_NO_MEMORY(ctdb, pub);
2429
2430         pub->addr  = addr;
2431         pub->mask  = mask;
2432         pub->len   = strlen(argv[1])+1;
2433         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2434
2435         do {
2436                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2437                 if (ret != 0) {
2438                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2439                         sleep(3);
2440                         retries++;
2441                 }
2442         } while (retries < 5 && ret != 0);
2443         if (ret != 0) {
2444                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2445                 talloc_free(tmp_ctx);
2446                 return ret;
2447         }
2448
2449         if (rebalance_node(ctdb, options.pnn) != 0) {
2450                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2451                 return ret;
2452         }
2453
2454         talloc_free(tmp_ctx);
2455         return 0;
2456 }
2457
2458 /*
2459   add a public ip address to a node
2460  */
2461 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2462 {
2463         ctdb_sock_addr addr;
2464
2465         if (argc != 1) {
2466                 usage();
2467         }
2468
2469         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2470                 printf("Badly formed ip : %s\n", argv[0]);
2471                 return -1;
2472         }
2473
2474         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2475
2476         return 0;
2477 }
2478
2479 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2480
2481 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2482 {
2483         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2484         struct ctdb_node_map *nodemap=NULL;
2485         struct ctdb_all_public_ips *ips;
2486         int ret, i, j;
2487
2488         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2489         if (ret != 0) {
2490                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2491                 return ret;
2492         }
2493
2494         /* remove it from the nodes that are not hosting the ip currently */
2495         for(i=0;i<nodemap->num;i++){
2496                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2497                         continue;
2498                 }
2499                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2500                 if (ret != 0) {
2501                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2502                         continue;
2503                 }
2504
2505                 for (j=0;j<ips->num;j++) {
2506                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2507                                 break;
2508                         }
2509                 }
2510                 if (j==ips->num) {
2511                         continue;
2512                 }
2513
2514                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2515                         continue;
2516                 }
2517
2518                 options.pnn = nodemap->nodes[i].pnn;
2519                 control_delip(ctdb, argc, argv);
2520         }
2521
2522
2523         /* remove it from every node (also the one hosting it) */
2524         for(i=0;i<nodemap->num;i++){
2525                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2526                         continue;
2527                 }
2528                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2529                 if (ret != 0) {
2530                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2531                         continue;
2532                 }
2533
2534                 for (j=0;j<ips->num;j++) {
2535                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2536                                 break;
2537                         }
2538                 }
2539                 if (j==ips->num) {
2540                         continue;
2541                 }
2542
2543                 options.pnn = nodemap->nodes[i].pnn;
2544                 control_delip(ctdb, argc, argv);
2545         }
2546
2547         talloc_free(tmp_ctx);
2548         return 0;
2549 }
2550         
2551 /*
2552   delete a public ip address from a node
2553  */
2554 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2555 {
2556         int i, ret;
2557         ctdb_sock_addr addr;
2558         struct ctdb_control_ip_iface pub;
2559         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2560         struct ctdb_all_public_ips *ips;
2561
2562         if (argc != 1) {
2563                 talloc_free(tmp_ctx);
2564                 usage();
2565         }
2566
2567         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2568                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2569                 return -1;
2570         }
2571
2572         if (options.pnn == CTDB_BROADCAST_ALL) {
2573                 return control_delip_all(ctdb, argc, argv, &addr);
2574         }
2575
2576         pub.addr  = addr;
2577         pub.mask  = 0;
2578         pub.len   = 0;
2579
2580         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2581         if (ret != 0) {
2582                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2583                 talloc_free(tmp_ctx);
2584                 return ret;
2585         }
2586         
2587         for (i=0;i<ips->num;i++) {
2588                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2589                         break;
2590                 }
2591         }
2592
2593         if (i==ips->num) {
2594                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2595                         ctdb_addr_to_str(&addr)));
2596                 talloc_free(tmp_ctx);
2597                 return -1;
2598         }
2599
2600         /* This is an optimisation.  If this node is hosting the IP
2601          * then try to move it somewhere else without invoking a full
2602          * takeover run.  We don't care if this doesn't work!
2603          */
2604         if (ips->ips[i].pnn == options.pnn) {
2605                 (void) try_moveip(ctdb, &addr, -1);
2606         }
2607
2608         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2609         if (ret != 0) {
2610                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2611                 talloc_free(tmp_ctx);
2612                 return ret;
2613         }
2614
2615         talloc_free(tmp_ctx);
2616         return 0;
2617 }
2618
2619 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2620                               int argc, const char **argv)
2621 {
2622         struct ctdb_control_killtcp *killtcp;
2623         int max_entries, current, i;
2624         struct timeval timeout;
2625         char line[128], src[128], dst[128];
2626         int linenum;
2627         TDB_DATA data;
2628         struct client_async_data *async_data;
2629         struct ctdb_client_control_state *state;
2630
2631         if (argc != 0) {
2632                 usage();
2633         }
2634
2635         linenum = 1;
2636         killtcp = NULL;
2637         max_entries = 0;
2638         current = 0;
2639         while (!feof(stdin)) {
2640                 if (fgets(line, sizeof(line), stdin) == NULL) {
2641                         continue;
2642                 }
2643
2644                 /* Silently skip empty lines */
2645                 if (line[0] == '\n') {
2646                         continue;
2647                 }
2648
2649                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2650                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2651                                           linenum, line));
2652                         talloc_free(killtcp);
2653                         return -1;
2654                 }
2655
2656                 if (current >= max_entries) {
2657                         max_entries += 1024;
2658                         killtcp = talloc_realloc(ctdb, killtcp,
2659                                                  struct ctdb_control_killtcp,
2660                                                  max_entries);
2661                         CTDB_NO_MEMORY(ctdb, killtcp);
2662                 }
2663
2664                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2665                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2666                                           linenum, src));
2667                         talloc_free(killtcp);
2668                         return -1;
2669                 }
2670
2671                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2672                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2673                                           linenum, dst));
2674                         talloc_free(killtcp);
2675                         return -1;
2676                 }
2677
2678                 current++;
2679         }
2680
2681         async_data = talloc_zero(ctdb, struct client_async_data);
2682         if (async_data == NULL) {
2683                 talloc_free(killtcp);
2684                 return -1;
2685         }
2686
2687         for (i = 0; i < current; i++) {
2688
2689                 data.dsize = sizeof(struct ctdb_control_killtcp);
2690                 data.dptr  = (unsigned char *)&killtcp[i];
2691
2692                 timeout = TIMELIMIT();
2693                 state = ctdb_control_send(ctdb, options.pnn, 0,
2694                                           CTDB_CONTROL_KILL_TCP, 0, data,
2695                                           async_data, &timeout, NULL);
2696
2697                 if (state == NULL) {
2698                         DEBUG(DEBUG_ERR,
2699                               ("Failed to call async killtcp control to node %u\n",
2700                                options.pnn));
2701                         talloc_free(killtcp);
2702                         return -1;
2703                 }
2704                 
2705                 ctdb_client_async_add(async_data, state);
2706         }
2707
2708         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2709                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2710                 talloc_free(killtcp);
2711                 return -1;
2712         }
2713
2714         talloc_free(killtcp);
2715         return 0;
2716 }
2717
2718
2719 /*
2720   kill a tcp connection
2721  */
2722 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2723 {
2724         int ret;
2725         struct ctdb_control_killtcp killtcp;
2726
2727         assert_single_node_only();
2728
2729         if (argc == 0) {
2730                 return kill_tcp_from_file(ctdb, argc, argv);
2731         }
2732
2733         if (argc < 2) {
2734                 usage();
2735         }
2736
2737         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2738                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2739                 return -1;
2740         }
2741
2742         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2743                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2744                 return -1;
2745         }
2746
2747         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2748         if (ret != 0) {
2749                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2750                 return ret;
2751         }
2752
2753         return 0;
2754 }
2755
2756
2757 /*
2758   send a gratious arp
2759  */
2760 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2761 {
2762         int ret;
2763         ctdb_sock_addr addr;
2764
2765         assert_single_node_only();
2766
2767         if (argc < 2) {
2768                 usage();
2769         }
2770
2771         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2772                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2773                 return -1;
2774         }
2775
2776         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2777         if (ret != 0) {
2778                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2779                 return ret;
2780         }
2781
2782         return 0;
2783 }
2784
2785 /*
2786   register a server id
2787  */
2788 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2789 {
2790         int ret;
2791         struct ctdb_server_id server_id;
2792
2793         if (argc < 3) {
2794                 usage();
2795         }
2796
2797         server_id.pnn       = strtoul(argv[0], NULL, 0);
2798         server_id.type      = strtoul(argv[1], NULL, 0);
2799         server_id.server_id = strtoul(argv[2], NULL, 0);
2800
2801         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2802         if (ret != 0) {
2803                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2804                 return ret;
2805         }
2806         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2807         sleep(999);
2808         return -1;
2809 }
2810
2811 /*
2812   unregister a server id
2813  */
2814 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2815 {
2816         int ret;
2817         struct ctdb_server_id server_id;
2818
2819         if (argc < 3) {
2820                 usage();
2821         }
2822
2823         server_id.pnn       = strtoul(argv[0], NULL, 0);
2824         server_id.type      = strtoul(argv[1], NULL, 0);
2825         server_id.server_id = strtoul(argv[2], NULL, 0);
2826
2827         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2828         if (ret != 0) {
2829                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2830                 return ret;
2831         }
2832         return -1;
2833 }
2834
2835 /*
2836   check if a server id exists
2837  */
2838 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2839 {
2840         uint32_t status;
2841         int ret;
2842         struct ctdb_server_id server_id;
2843
2844         if (argc < 3) {
2845                 usage();
2846         }
2847
2848         server_id.pnn       = strtoul(argv[0], NULL, 0);
2849         server_id.type      = strtoul(argv[1], NULL, 0);
2850         server_id.server_id = strtoul(argv[2], NULL, 0);
2851
2852         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2853         if (ret != 0) {
2854                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2855                 return ret;
2856         }
2857
2858         if (status) {
2859                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2860         } else {
2861                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2862         }
2863         return 0;
2864 }
2865
2866 /*
2867   get a list of all server ids that are registered on a node
2868  */
2869 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2870 {
2871         int i, ret;
2872         struct ctdb_server_id_list *server_ids;
2873
2874         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2875         if (ret != 0) {
2876                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2877                 return ret;
2878         }
2879
2880         for (i=0; i<server_ids->num; i++) {
2881                 printf("Server id %d:%d:%d\n", 
2882                         server_ids->server_ids[i].pnn, 
2883                         server_ids->server_ids[i].type, 
2884                         server_ids->server_ids[i].server_id); 
2885         }
2886
2887         return -1;
2888 }
2889
2890 /*
2891   check if a server id exists
2892  */
2893 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2894 {
2895         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2896         uint64_t *ids;
2897         uint8_t *result;
2898         int i;
2899
2900         if (argc < 1) {
2901                 talloc_free(tmp_ctx);
2902                 usage();
2903         }
2904
2905         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2906         result = talloc_array(tmp_ctx, uint8_t, argc);
2907
2908         for (i = 0; i < argc; i++) {
2909                 ids[i] = strtoull(argv[i], NULL, 0);
2910         }
2911
2912         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2913                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2914                                   options.pnn));
2915                 talloc_free(tmp_ctx);
2916                 return -1;
2917         }
2918
2919         for (i=0; i < argc; i++) {
2920                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2921                        result[i] ? "exists" : "does not exist");
2922         }
2923
2924         talloc_free(tmp_ctx);
2925         return 0;
2926 }
2927
2928 /*
2929   send a tcp tickle ack
2930  */
2931 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2932 {
2933         int ret;
2934         ctdb_sock_addr  src, dst;
2935
2936         if (argc < 2) {
2937                 usage();
2938         }
2939
2940         if (!parse_ip_port(argv[0], &src)) {
2941                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2942                 return -1;
2943         }
2944
2945         if (!parse_ip_port(argv[1], &dst)) {
2946                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2947                 return -1;
2948         }
2949
2950         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2951         if (ret==0) {
2952                 return 0;
2953         }
2954         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2955
2956         return -1;
2957 }
2958
2959
2960 /*
2961   display public ip status
2962  */
2963 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2964 {
2965         int i, ret;
2966         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2967         struct ctdb_all_public_ips *ips;
2968
2969         if (options.pnn == CTDB_BROADCAST_ALL) {
2970                 /* read the list of public ips from all nodes */
2971                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2972         } else {
2973                 /* read the public ip list from this node */
2974                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2975         }
2976         if (ret != 0) {
2977                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2978                 talloc_free(tmp_ctx);
2979                 return ret;
2980         }
2981
2982         if (options.machinereadable){
2983                 printm(":Public IP:Node:");
2984                 if (options.verbose){
2985                         printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2986                 }
2987                 printm("\n");
2988         } else {
2989                 if (options.pnn == CTDB_BROADCAST_ALL) {
2990                         printf("Public IPs on ALL nodes\n");
2991                 } else {
2992                         printf("Public IPs on node %u\n", options.pnn);
2993                 }
2994         }
2995
2996         for (i=1;i<=ips->num;i++) {
2997                 struct ctdb_control_public_ip_info *info = NULL;
2998                 int32_t pnn;
2999                 char *aciface = NULL;
3000                 char *avifaces = NULL;
3001                 char *cifaces = NULL;
3002
3003                 if (options.pnn == CTDB_BROADCAST_ALL) {
3004                         pnn = ips->ips[ips->num-i].pnn;
3005                 } else {
3006                         pnn = options.pnn;
3007                 }
3008
3009                 if (pnn != -1) {
3010                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
3011                                                    &ips->ips[ips->num-i].addr, &info);
3012                 } else {
3013                         ret = -1;
3014                 }
3015
3016                 if (ret == 0) {
3017                         int j;
3018                         for (j=0; j < info->num; j++) {
3019                                 if (cifaces == NULL) {
3020                                         cifaces = talloc_strdup(info,
3021                                                                 info->ifaces[j].name);
3022                                 } else {
3023                                         cifaces = talloc_asprintf_append(cifaces,
3024                                                                          ",%s",
3025                                                                          info->ifaces[j].name);
3026                                 }
3027
3028                                 if (info->active_idx == j) {
3029                                         aciface = info->ifaces[j].name;
3030                                 }
3031
3032                                 if (info->ifaces[j].link_state == 0) {
3033                                         continue;
3034                                 }
3035
3036                                 if (avifaces == NULL) {
3037                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
3038                                 } else {
3039                                         avifaces = talloc_asprintf_append(avifaces,
3040                                                                           ",%s",
3041                                                                           info->ifaces[j].name);
3042                                 }
3043                         }
3044                 }
3045
3046                 if (options.machinereadable){
3047                         printm(":%s:%d:",
3048                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3049                                 ips->ips[ips->num-i].pnn);
3050                         if (options.verbose){
3051                                 printm("%s:%s:%s:",
3052                                         aciface?aciface:"",
3053                                         avifaces?avifaces:"",
3054                                         cifaces?cifaces:"");
3055                         }
3056                         printf("\n");
3057                 } else {
3058                         if (options.verbose) {
3059                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
3060                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3061                                         ips->ips[ips->num-i].pnn,
3062                                         aciface?aciface:"",
3063                                         avifaces?avifaces:"",
3064                                         cifaces?cifaces:"");
3065                         } else {
3066                                 printf("%s %d\n",
3067                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3068                                         ips->ips[ips->num-i].pnn);
3069                         }
3070                 }
3071                 talloc_free(info);
3072         }
3073
3074         talloc_free(tmp_ctx);
3075         return 0;
3076 }
3077
3078 /*
3079   public ip info
3080  */
3081 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
3082 {
3083         int i, ret;
3084         ctdb_sock_addr addr;
3085         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3086         struct ctdb_control_public_ip_info *info;
3087
3088         if (argc != 1) {
3089                 talloc_free(tmp_ctx);
3090                 usage();
3091         }
3092
3093         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
3094                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
3095                 return -1;
3096         }
3097
3098         /* read the public ip info from this node */
3099         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
3100                                            tmp_ctx, &addr, &info);
3101         if (ret != 0) {
3102                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
3103                                   argv[0], options.pnn));
3104                 talloc_free(tmp_ctx);
3105                 return ret;
3106         }
3107
3108         printf("Public IP[%s] info on node %u\n",
3109                ctdb_addr_to_str(&info->ip.addr),
3110                options.pnn);
3111
3112         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
3113                ctdb_addr_to_str(&info->ip.addr),
3114                info->ip.pnn, info->num);
3115
3116         for (i=0; i<info->num; i++) {
3117                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
3118
3119                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
3120                        i+1, info->ifaces[i].name,
3121                        info->ifaces[i].link_state?"up":"down",
3122                        (unsigned int)info->ifaces[i].references,
3123                        (i==info->active_idx)?" (active)":"");
3124         }
3125
3126         talloc_free(tmp_ctx);
3127         return 0;
3128 }
3129
3130 /*
3131   display interfaces status
3132  */
3133 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3134 {
3135         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3136         int i;
3137         struct ctdb_control_get_ifaces *ifaces;
3138         int ret;
3139
3140         /* read the public ip list from this node */
3141         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3142         if (ret != 0) {
3143                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3144                                   options.pnn));
3145                 talloc_free(tmp_ctx);
3146                 return -1;
3147         }
3148
3149         if (options.machinereadable){
3150                 printm(":Name:LinkStatus:References:\n");
3151         } else {
3152                 printf("Interfaces on node %u\n", options.pnn);
3153         }
3154
3155         for (i=0; i<ifaces->num; i++) {
3156                 if (options.machinereadable){
3157                         printm(":%s:%s:%u:\n",
3158                                ifaces->ifaces[i].name,
3159                                ifaces->ifaces[i].link_state?"1":"0",
3160                                (unsigned int)ifaces->ifaces[i].references);
3161                 } else {
3162                         printf("name:%s link:%s references:%u\n",
3163                                ifaces->ifaces[i].name,
3164                                ifaces->ifaces[i].link_state?"up":"down",
3165                                (unsigned int)ifaces->ifaces[i].references);
3166                 }
3167         }
3168
3169         talloc_free(tmp_ctx);
3170         return 0;
3171 }
3172
3173
3174 /*
3175   set link status of an interface
3176  */
3177 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3178 {
3179         int ret;
3180         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3181         struct ctdb_control_iface_info info;
3182
3183         ZERO_STRUCT(info);
3184
3185         if (argc != 2) {
3186                 usage();
3187         }
3188
3189         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3190                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3191                                   argv[0]));
3192                 talloc_free(tmp_ctx);
3193                 return -1;
3194         }
3195         strcpy(info.name, argv[0]);
3196
3197         if (strcmp(argv[1], "up") == 0) {
3198                 info.link_state = 1;
3199         } else if (strcmp(argv[1], "down") == 0) {
3200                 info.link_state = 0;
3201         } else {
3202                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3203                                   argv[1]));
3204                 talloc_free(tmp_ctx);
3205                 return -1;
3206         }
3207
3208         /* read the public ip list from this node */
3209         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3210                                    tmp_ctx, &info);
3211         if (ret != 0) {
3212                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3213                                   argv[0], options.pnn));
3214                 talloc_free(tmp_ctx);
3215                 return ret;
3216         }
3217
3218         talloc_free(tmp_ctx);
3219         return 0;
3220 }
3221
3222 /*
3223   display pid of a ctdb daemon
3224  */
3225 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3226 {
3227         uint32_t pid;
3228         int ret;
3229
3230         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3231         if (ret != 0) {
3232                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3233                 return ret;
3234         }
3235         printf("Pid:%d\n", pid);
3236
3237         return 0;
3238 }
3239
3240 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3241
3242 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3243                                               void *data,
3244                                               update_flags_handler_t handler,
3245                                               uint32_t flag,
3246                                               const char *desc,
3247                                               bool set_flag)
3248 {
3249         struct ctdb_node_map *nodemap = NULL;
3250         bool flag_is_set;
3251         int ret;
3252
3253         /* Check if the node is already in the desired state */
3254         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3255         if (ret != 0) {
3256                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3257                 exit(10);
3258         }
3259         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3260         if (set_flag == flag_is_set) {
3261                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3262                                      (set_flag ? "already" : "not"), desc));
3263                 return 0;
3264         }
3265
3266         do {
3267                 if (!handler(ctdb, data)) {
3268                         DEBUG(DEBUG_WARNING,
3269                               ("Failed to send control to set state %s on node %u, try again\n",
3270                                desc, options.pnn));
3271                 }
3272
3273                 sleep(1);
3274
3275                 /* Read the nodemap and verify the change took effect.
3276                  * Even if the above control/hanlder timed out then it
3277                  * could still have worked!
3278                  */
3279                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3280                                          ctdb, &nodemap);
3281                 if (ret != 0) {
3282                         DEBUG(DEBUG_WARNING,
3283                               ("Unable to get nodemap from local node, try again\n"));
3284                 }
3285                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3286         } while (nodemap == NULL || (set_flag != flag_is_set));
3287
3288         return ipreallocate(ctdb);
3289 }
3290
3291 /* Administratively disable a node */
3292 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3293 {
3294         int ret;
3295
3296         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3297                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3298         return ret == 0;
3299 }
3300
3301 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3302 {
3303         return update_flags_and_ipreallocate(ctdb, NULL,
3304                                                   update_flags_disabled,
3305                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3306                                                   "disabled",
3307                                                   true /* set_flag*/);
3308 }
3309
3310 /* Administratively re-enable a node */
3311 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3312 {
3313         int ret;
3314
3315         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3316                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3317         return ret == 0;
3318 }
3319
3320 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3321 {
3322         return update_flags_and_ipreallocate(ctdb, NULL,
3323                                                   update_flags_not_disabled,
3324                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3325                                                   "disabled",
3326                                                   false /* set_flag*/);
3327 }
3328
3329 /* Stop a node */
3330 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3331 {
3332         int ret;
3333
3334         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3335
3336         return ret == 0;
3337 }
3338
3339 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3340 {
3341         return update_flags_and_ipreallocate(ctdb, NULL,
3342                                                   update_flags_stopped,
3343                                                   NODE_FLAGS_STOPPED,
3344                                                   "stopped",
3345                                                   true /* set_flag*/);
3346 }
3347
3348 /* Continue a stopped node */
3349 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3350 {
3351         int ret;
3352
3353         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3354
3355         return ret == 0;
3356 }
3357
3358 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3359 {
3360         return update_flags_and_ipreallocate(ctdb, NULL,
3361                                                   update_flags_not_stopped,
3362                                                   NODE_FLAGS_STOPPED,
3363                                                   "stopped",
3364                                                   false /* set_flag */);
3365 }
3366
3367 static uint32_t get_generation(struct ctdb_context *ctdb)
3368 {
3369         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3370         struct ctdb_vnn_map *vnnmap=NULL;
3371         int ret;
3372         uint32_t generation;
3373
3374         /* wait until the recmaster is not in recovery mode */
3375         while (1) {
3376                 uint32_t recmode, recmaster;
3377                 
3378                 if (vnnmap != NULL) {
3379                         talloc_free(vnnmap);
3380                         vnnmap = NULL;
3381                 }
3382
3383                 /* get the recmaster */
3384                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3385                 if (ret != 0) {
3386                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3387                         talloc_free(tmp_ctx);
3388                         exit(10);
3389                 }
3390
3391                 /* get recovery mode */
3392                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3393                 if (ret != 0) {
3394                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3395                         talloc_free(tmp_ctx);
3396                         exit(10);
3397                 }
3398
3399                 /* get the current generation number */
3400                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3401                 if (ret != 0) {
3402                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3403                         talloc_free(tmp_ctx);
3404                         exit(10);
3405                 }
3406
3407                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3408                         generation = vnnmap->generation;
3409                         talloc_free(tmp_ctx);
3410                         return generation;
3411                 }
3412                 sleep(1);
3413         }
3414 }
3415
3416 /* Ban a node */
3417 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3418 {
3419         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3420         int ret;
3421
3422         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3423
3424         return ret == 0;
3425 }
3426
3427 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3428 {
3429         struct ctdb_ban_time bantime;
3430
3431         if (argc < 1) {
3432                 usage();
3433         }
3434         
3435         bantime.pnn  = options.pnn;
3436         bantime.time = strtoul(argv[0], NULL, 0);
3437
3438         if (bantime.time == 0) {
3439                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3440                 return -1;
3441         }
3442
3443         return update_flags_and_ipreallocate(ctdb, &bantime,
3444                                                   update_state_banned,
3445                                                   NODE_FLAGS_BANNED,
3446                                                   "banned",
3447                                                   true /* set_flag*/);
3448 }
3449
3450
3451 /* Unban a node */
3452 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3453 {
3454         struct ctdb_ban_time bantime;
3455
3456         bantime.pnn  = options.pnn;
3457         bantime.time = 0;
3458
3459         return update_flags_and_ipreallocate(ctdb, &bantime,
3460                                                   update_state_banned,
3461                                                   NODE_FLAGS_BANNED,
3462                                                   "banned",
3463                                                   false /* set_flag*/);
3464 }
3465
3466 /*
3467   show ban information for a node
3468  */
3469 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3470 {
3471         int ret;
3472         struct ctdb_node_map *nodemap=NULL;
3473         struct ctdb_ban_time *bantime;
3474
3475         /* verify the node exists */
3476         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3477         if (ret != 0) {
3478                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3479                 return ret;
3480         }
3481
3482         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3483         if (ret != 0) {
3484                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3485                 return -1;
3486         }       
3487
3488         if (bantime->time == 0) {
3489                 printf("Node %u is not banned\n", bantime->pnn);
3490         } else {
3491                 printf("Node %u is banned, %d seconds remaining\n",
3492                        bantime->pnn, bantime->time);
3493         }
3494
3495         return 0;
3496 }
3497
3498 /*
3499   shutdown a daemon
3500  */
3501 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3502 {
3503         int ret;
3504
3505         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3506         if (ret != 0) {
3507                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3508                 return ret;
3509         }
3510
3511         return 0;
3512 }
3513
3514 /*
3515   trigger a recovery
3516  */
3517 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3518 {
3519         int ret;
3520         uint32_t generation, next_generation;
3521
3522         /* record the current generation number */
3523         generation = get_generation(ctdb);
3524
3525         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3526         if (ret != 0) {
3527                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3528                 return ret;
3529         }
3530
3531         /* wait until we are in a new generation */
3532         while (1) {
3533                 next_generation = get_generation(ctdb);
3534                 if (next_generation != generation) {
3535                         return 0;
3536                 }
3537                 sleep(1);
3538         }
3539
3540         return 0;
3541 }
3542
3543
3544 /*
3545   display monitoring mode of a remote node
3546  */
3547 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3548 {
3549         uint32_t monmode;
3550         int ret;
3551
3552         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3553         if (ret != 0) {
3554                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3555                 return ret;
3556         }
3557         if (!options.machinereadable){
3558                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3559         } else {
3560                 printm(":mode:\n");
3561                 printm(":%d:\n",monmode);
3562         }
3563         return 0;
3564 }
3565
3566
3567 /*
3568   display capabilities of a remote node
3569  */
3570 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3571 {
3572         uint32_t capabilities;
3573         int ret;
3574
3575         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3576         if (ret != 0) {
3577                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3578                 return -1;
3579         }
3580         
3581         if (!options.machinereadable){
3582                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3583                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3584                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3585                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3586         } else {
3587                 printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
3588                 printm(":%d:%d:%d:%d:\n",
3589                         !!(capabilities&CTDB_CAP_RECMASTER),
3590                         !!(capabilities&CTDB_CAP_LMASTER),
3591                         !!(capabilities&CTDB_CAP_LVS),
3592                         !!(capabilities&CTDB_CAP_NATGW));
3593         }
3594         return 0;
3595 }
3596
3597 /*
3598   display lvs configuration
3599  */
3600
3601 static uint32_t lvs_exclude_flags[] = {
3602         /* Look for a nice healthy node */
3603         NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
3604         /* If not found, an UNHEALTHY node will do */
3605         NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
3606         0,
3607 };
3608
3609 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3610 {
3611         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3612         struct ctdb_node_map *orig_nodemap=NULL;
3613         struct ctdb_node_map *nodemap;
3614         int i, ret;
3615
3616         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3617                                    tmp_ctx, &orig_nodemap);
3618         if (ret != 0) {
3619                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3620                 talloc_free(tmp_ctx);
3621                 return -1;
3622         }
3623
3624         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3625                                                  CTDB_CAP_LVS, false);
3626         if (nodemap == NULL) {
3627                 /* No memory */
3628                 ret = -1;
3629                 goto done;
3630         }
3631
3632         ret = 0;
3633
3634         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3635                 struct ctdb_node_map *t =
3636                         filter_nodemap_by_flags(ctdb, nodemap,
3637                                                 lvs_exclude_flags[i]);
3638                 if (t == NULL) {
3639                         /* No memory */
3640                         ret = -1;
3641                         goto done;
3642                 }
3643                 if (t->num > 0) {
3644                         /* At least 1 node without excluded flags */
3645                         int j;
3646                         for (j = 0; j < t->num; j++) {
3647                                 printf("%d:%s\n", t->nodes[j].pnn, 
3648                                        ctdb_addr_to_str(&t->nodes[j].addr));
3649                         }
3650                         goto done;
3651                 }
3652                 talloc_free(t);
3653         }
3654 done:
3655         talloc_free(tmp_ctx);
3656         return ret;
3657 }
3658
3659 /*
3660   display who is the lvs master
3661  */
3662 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3663 {
3664         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3665         struct ctdb_node_map *nodemap=NULL;
3666         int i, ret;
3667
3668         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3669                                    tmp_ctx, &nodemap);
3670         if (ret != 0) {
3671                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3672                 talloc_free(tmp_ctx);
3673                 return -1;
3674         }
3675
3676         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3677                 struct ctdb_node_map *t =
3678                         filter_nodemap_by_flags(ctdb, nodemap,
3679                                                 lvs_exclude_flags[i]);
3680                 if (t == NULL) {
3681                         /* No memory */
3682                         ret = -1;
3683                         goto done;
3684                 }
3685                 if (t->num > 0) {
3686                         struct ctdb_node_map *n;
3687                         n = filter_nodemap_by_capabilities(ctdb,
3688                                                            t,
3689                                                            CTDB_CAP_LVS,
3690                                                            true);
3691                         if (n == NULL) {
3692                                 /* No memory */
3693                                 ret = -1;
3694                                 goto done;
3695                         }
3696                         if (n->num > 0) {
3697                                 ret = 0;
3698                                 if (options.machinereadable) {
3699                                         printm("%d\n", n->nodes[0].pnn);
3700                                 } else {
3701                                         printf("Node %d is LVS master\n", n->nodes[0].pnn);
3702                                 }
3703                                 goto done;
3704                         }
3705                 }
3706                 talloc_free(t);
3707         }
3708
3709         printf("There is no LVS master\n");
3710         ret = 255;
3711 done:
3712         talloc_free(tmp_ctx);
3713         return ret;
3714 }
3715
3716 /*
3717   disable monitoring on a  node
3718  */
3719 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3720 {
3721         
3722         int ret;
3723
3724         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3725         if (ret != 0) {
3726                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3727                 return ret;
3728         }
3729         printf("Monitoring mode:%s\n","DISABLED");
3730
3731         return 0;
3732 }
3733
3734 /*
3735   enable monitoring on a  node
3736  */
3737 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3738 {
3739         
3740         int ret;
3741
3742         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3743         if (ret != 0) {
3744                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3745                 return ret;
3746         }
3747         printf("Monitoring mode:%s\n","ACTIVE");
3748
3749         return 0;
3750 }
3751
3752 /*
3753   display remote list of keys/data for a db
3754  */
3755 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3756 {
3757         const char *db_name;
3758         struct ctdb_db_context *ctdb_db;
3759         int ret;
3760         struct ctdb_dump_db_context c;
3761         uint8_t flags;
3762
3763         if (argc < 1) {
3764                 usage();
3765         }
3766
3767         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3768                 return -1;
3769         }
3770
3771         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3772         if (ctdb_db == NULL) {
3773                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3774                 return -1;
3775         }
3776
3777         if (options.printlmaster) {
3778                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3779                                           ctdb, &ctdb->vnn_map);
3780                 if (ret != 0) {
3781                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3782                                           options.pnn));
3783                         return ret;
3784                 }
3785         }
3786
3787         ZERO_STRUCT(c);
3788         c.f = stdout;
3789         c.printemptyrecords = (bool)options.printemptyrecords;
3790         c.printdatasize = (bool)options.printdatasize;
3791         c.printlmaster = (bool)options.printlmaster;
3792         c.printhash = (bool)options.printhash;
3793         c.printrecordflags = (bool)options.printrecordflags;
3794
3795         /* traverse and dump the cluster tdb */
3796         ret = ctdb_dump_db(ctdb_db, &c);
3797         if (ret == -1) {
3798                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3799                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3800                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3801                                   db_name));
3802                 return -1;
3803         }
3804         talloc_free(ctdb_db);
3805
3806         printf("Dumped %d records\n", ret);
3807         return 0;
3808 }
3809
3810 struct cattdb_data {
3811         struct ctdb_context *ctdb;
3812         uint32_t count;
3813 };
3814
3815 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3816 {
3817         struct cattdb_data *d = private_data;
3818         struct ctdb_dump_db_context c;
3819
3820         d->count++;
3821
3822         ZERO_STRUCT(c);
3823         c.f = stdout;
3824         c.printemptyrecords = (bool)options.printemptyrecords;
3825         c.printdatasize = (bool)options.printdatasize;
3826         c.printlmaster = false;
3827         c.printhash = (bool)options.printhash;
3828         c.printrecordflags = true;
3829
3830         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3831 }
3832
3833 /*
3834   cat the local tdb database using same format as catdb
3835  */
3836 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3837 {
3838         const char *db_name;
3839         struct ctdb_db_context *ctdb_db;
3840         struct cattdb_data d;
3841         uint8_t flags;
3842
3843         if (argc < 1) {
3844                 usage();
3845         }
3846
3847         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3848                 return -1;
3849         }
3850
3851         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3852         if (ctdb_db == NULL) {
3853                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3854                 return -1;
3855         }
3856
3857         /* traverse the local tdb */
3858         d.count = 0;
3859         d.ctdb  = ctdb;
3860         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3861                 printf("Failed to cattdb data\n");
3862                 exit(10);
3863         }
3864         talloc_free(ctdb_db);
3865
3866         printf("Dumped %d records\n", d.count);
3867         return 0;
3868 }
3869
3870 /*
3871   display the content of a database key
3872  */
3873 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3874 {
3875         const char *db_name;
3876         struct ctdb_db_context *ctdb_db;
3877         struct ctdb_record_handle *h;
3878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3879         TDB_DATA key, data;
3880         uint8_t flags;
3881
3882         if (argc < 2) {
3883                 usage();
3884         }
3885
3886         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3887                 return -1;
3888         }
3889
3890         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3891         if (ctdb_db == NULL) {
3892                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3893                 return -1;
3894         }
3895
3896         key.dptr  = discard_const(argv[1]);
3897         key.dsize = strlen((char *)key.dptr);
3898
3899         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3900         if (h == NULL) {
3901                 printf("Failed to fetch record '%s' on node %d\n", 
3902                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3903                 talloc_free(tmp_ctx);
3904                 exit(10);
3905         }
3906
3907         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3908
3909         talloc_free(tmp_ctx);
3910         talloc_free(ctdb_db);
3911         return 0;
3912 }
3913
3914 /*
3915   display the content of a database key
3916  */
3917 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3918 {
3919         const char *db_name;
3920         struct ctdb_db_context *ctdb_db;
3921         struct ctdb_record_handle *h;
3922         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3923         TDB_DATA key, data;
3924         uint8_t flags;
3925
3926         if (argc < 3) {
3927                 usage();
3928         }
3929
3930         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3931                 return -1;
3932         }
3933
3934         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3935         if (ctdb_db == NULL) {
3936                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3937                 return -1;
3938         }
3939
3940         key.dptr  = discard_const(argv[1]);
3941         key.dsize = strlen((char *)key.dptr);
3942
3943         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3944         if (h == NULL) {
3945                 printf("Failed to fetch record '%s' on node %d\n", 
3946                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3947                 talloc_free(tmp_ctx);
3948                 exit(10);
3949         }
3950
3951         data.dptr  = discard_const(argv[2]);
3952         data.dsize = strlen((char *)data.dptr);
3953
3954         if (ctdb_record_store(h, data) != 0) {
3955                 printf("Failed to store record\n");
3956         }
3957
3958         talloc_free(h);
3959         talloc_free(tmp_ctx);
3960         talloc_free(ctdb_db);
3961         return 0;
3962 }
3963
3964 /*
3965   fetch a record from a persistent database
3966  */
3967 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3968 {
3969         const char *db_name;
3970         struct ctdb_db_context *ctdb_db;
3971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3972         struct ctdb_transaction_handle *h;
3973         TDB_DATA key, data;
3974         int fd, ret;
3975         bool persistent;
3976         uint8_t flags;
3977
3978         if (argc < 2) {
3979                 talloc_free(tmp_ctx);
3980                 usage();
3981         }
3982
3983         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3984                 talloc_free(tmp_ctx);
3985                 return -1;
3986         }
3987
3988         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3989         if (!persistent) {
3990                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3991                 talloc_free(tmp_ctx);
3992                 return -1;
3993         }
3994
3995         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3996         if (ctdb_db == NULL) {
3997                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3998                 talloc_free(tmp_ctx);
3999                 return -1;
4000         }
4001
4002         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4003         if (h == NULL) {
4004                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4005                 talloc_free(tmp_ctx);
4006                 return -1;
4007         }
4008
4009         key.dptr  = discard_const(argv[1]);
4010         key.dsize = strlen(argv[1]);
4011         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
4012         if (ret != 0) {
4013                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
4014                 talloc_free(tmp_ctx);
4015                 return -1;
4016         }
4017
4018         if (data.dsize == 0 || data.dptr == NULL) {
4019                 DEBUG(DEBUG_ERR,("Record is empty\n"));
4020                 talloc_free(tmp_ctx);
4021                 return -1;
4022         }
4023
4024         if (argc == 3) {
4025           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4026                 if (fd == -1) {
4027                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
4028                         talloc_free(tmp_ctx);
4029                         return -1;
4030                 }
4031                 sys_write(fd, data.dptr, data.dsize);
4032                 close(fd);
4033         } else {
4034                 sys_write(1, data.dptr, data.dsize);
4035         }
4036
4037         /* abort the transaction */
4038         talloc_free(h);
4039
4040
4041         talloc_free(tmp_ctx);
4042         return 0;
4043 }
4044
4045 /*
4046   fetch a record from a tdb-file
4047  */
4048 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
4049 {
4050         const char *tdb_file;
4051         TDB_CONTEXT *tdb;
4052         TDB_DATA key, data;
4053         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4054         int fd;
4055
4056         if (argc < 2) {
4057                 usage();
4058         }
4059
4060         tdb_file = argv[0];
4061
4062         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
4063         if (tdb == NULL) {
4064                 printf("Failed to open TDB file %s\n", tdb_file);
4065                 return -1;
4066         }
4067
4068         if (!strncmp(argv[1], "0x", 2)) {
4069                 key = hextodata(tmp_ctx, argv[1] + 2);
4070                 if (key.dsize == 0) {
4071                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4072                         return -1;
4073                 }
4074         } else {
4075                 key.dptr  = discard_const(argv[1]);
4076                 key.dsize = strlen(argv[1]);
4077         }
4078
4079         data = tdb_fetch(tdb, key);
4080         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4081                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4082                 tdb_close(tdb);
4083                 return -1;
4084         }
4085
4086         tdb_close(tdb);
4087
4088         if (argc == 3) {
4089           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4090                 if (fd == -1) {
4091                         printf("Failed to open output file %s\n", argv[2]);
4092                         return -1;
4093                 }
4094                 if (options.verbose){
4095                         sys_write(fd, data.dptr, data.dsize);
4096                 } else {
4097                         sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4098                 }
4099                 close(fd);
4100         } else {
4101                 if (options.verbose){
4102                         sys_write(1, data.dptr, data.dsize);
4103                 } else {
4104                         sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4105                 }
4106         }
4107
4108         talloc_free(tmp_ctx);
4109         return 0;
4110 }
4111
4112 /*
4113   store a record and header to a tdb-file
4114  */
4115 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4116 {
4117         const char *tdb_file;
4118         TDB_CONTEXT *tdb;
4119         TDB_DATA key, value, data;
4120         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4121         struct ctdb_ltdb_header header;
4122
4123         if (argc < 3) {
4124                 usage();
4125         }
4126
4127         tdb_file = argv[0];
4128
4129         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4130         if (tdb == NULL) {
4131                 printf("Failed to open TDB file %s\n", tdb_file);
4132                 return -1;
4133         }
4134
4135         if (!strncmp(argv[1], "0x", 2)) {
4136                 key = hextodata(tmp_ctx, argv[1] + 2);
4137                 if (key.dsize == 0) {
4138                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4139                         return -1;
4140                 }
4141         } else {
4142                 key.dptr  = discard_const(argv[1]);
4143                 key.dsize = strlen(argv[1]);
4144         }
4145
4146         if (!strncmp(argv[2], "0x", 2)) {
4147                 value = hextodata(tmp_ctx, argv[2] + 2);
4148                 if (value.dsize == 0) {
4149                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4150                         return -1;
4151                 }
4152         } else {
4153                 value.dptr  = discard_const(argv[2]);
4154                 value.dsize = strlen(argv[2]);
4155         }
4156
4157         ZERO_STRUCT(header);
4158         if (argc > 3) {
4159                 header.rsn = atoll(argv[3]);
4160         }
4161         if (argc > 4) {
4162                 header.dmaster = atoi(argv[4]);
4163         }
4164         if (argc > 5) {
4165                 header.flags = atoi(argv[5]);
4166         }
4167
4168         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4169         data.dptr = talloc_size(tmp_ctx, data.dsize);
4170         if (data.dptr == NULL) {
4171                 printf("Failed to allocate header+value\n");
4172                 return -1;
4173         }
4174
4175         *(struct ctdb_ltdb_header *)data.dptr = header;
4176         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4177
4178         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4179                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4180                 tdb_close(tdb);
4181                 return -1;
4182         }
4183
4184         tdb_close(tdb);
4185
4186         talloc_free(tmp_ctx);
4187         return 0;
4188 }
4189
4190 /*
4191   write a record to a persistent database
4192  */
4193 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4194 {
4195         const char *db_name;
4196         struct ctdb_db_context *ctdb_db;
4197         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4198         struct ctdb_transaction_handle *h;
4199         struct stat st;
4200         TDB_DATA key, data;
4201         int fd, ret;
4202
4203         if (argc < 3) {
4204                 talloc_free(tmp_ctx);
4205                 usage();
4206         }
4207
4208         fd = open(argv[2], O_RDONLY);
4209         if (fd == -1) {
4210                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4211                 talloc_free(tmp_ctx);
4212                 return -1;
4213         }
4214         
4215         ret = fstat(fd, &st);
4216         if (ret == -1) {
4217                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4218                 close(fd);
4219                 talloc_free(tmp_ctx);
4220                 return -1;
4221         }
4222
4223         if (!S_ISREG(st.st_mode)) {
4224                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4225                 close(fd);
4226                 talloc_free(tmp_ctx);
4227                 return -1;
4228         }
4229
4230         data.dsize = st.st_size;
4231         if (data.dsize == 0) {
4232                 data.dptr  = NULL;
4233         } else {
4234                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4235                 if (data.dptr == NULL) {
4236                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4237                         close(fd);
4238                         talloc_free(tmp_ctx);
4239                         return -1;
4240                 }
4241                 ret = sys_read(fd, data.dptr, data.dsize);
4242                 if (ret != data.dsize) {
4243                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4244                         close(fd);
4245                         talloc_free(tmp_ctx);
4246                         return -1;
4247                 }
4248         }
4249         close(fd);
4250
4251
4252         db_name = argv[0];
4253
4254         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4255         if (ctdb_db == NULL) {
4256                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4257                 talloc_free(tmp_ctx);
4258                 return -1;
4259         }
4260
4261         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4262         if (h == NULL) {
4263                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4264                 talloc_free(tmp_ctx);
4265                 return -1;
4266         }
4267
4268         key.dptr  = discard_const(argv[1]);
4269         key.dsize = strlen(argv[1]);
4270         ret = ctdb_transaction_store(h, key, data);
4271         if (ret != 0) {
4272                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4273                 talloc_free(tmp_ctx);
4274                 return -1;
4275         }
4276
4277         ret = ctdb_transaction_commit(h);
4278         if (ret != 0) {
4279                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4280                 talloc_free(tmp_ctx);
4281                 return -1;
4282         }
4283
4284
4285         talloc_free(tmp_ctx);
4286         return 0;
4287 }
4288
4289 /*
4290  * delete a record from a persistent database
4291  */
4292 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4293 {
4294         const char *db_name;
4295         struct ctdb_db_context *ctdb_db;
4296         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4297         struct ctdb_transaction_handle *h;
4298         TDB_DATA key;
4299         int ret;
4300         bool persistent;
4301         uint8_t flags;
4302
4303         if (argc < 2) {
4304                 talloc_free(tmp_ctx);
4305                 usage();
4306         }
4307
4308         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4309                 talloc_free(tmp_ctx);
4310                 return -1;
4311         }
4312
4313         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4314         if (!persistent) {
4315                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4316                 talloc_free(tmp_ctx);
4317                 return -1;
4318         }
4319
4320         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4321         if (ctdb_db == NULL) {
4322                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4323                 talloc_free(tmp_ctx);
4324                 return -1;
4325         }
4326
4327         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4328         if (h == NULL) {
4329                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4330                 talloc_free(tmp_ctx);
4331                 return -1;
4332         }
4333
4334         key.dptr = discard_const(argv[1]);
4335         key.dsize = strlen(argv[1]);
4336         ret = ctdb_transaction_store(h, key, tdb_null);
4337         if (ret != 0) {
4338                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4339                 talloc_free(tmp_ctx);
4340                 return -1;
4341         }
4342
4343         ret = ctdb_transaction_commit(h);
4344         if (ret != 0) {
4345                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4346                 talloc_free(tmp_ctx);
4347                 return -1;
4348         }
4349
4350         talloc_free(tmp_ctx);
4351         return 0;
4352 }
4353
4354 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4355                                        TDB_DATA *data)
4356 {
4357         const char *t;
4358         size_t n;
4359         const char *ret; /* Next byte after successfully parsed value */
4360
4361         /* Error, unless someone says otherwise */
4362         ret = NULL;
4363         /* Indicates no value to parse */
4364         *data = tdb_null;
4365
4366         /* Skip whitespace */
4367         n = strspn(s, " \t");
4368         t = s + n;
4369
4370         if (t[0] == '"') {
4371                 /* Quoted ASCII string - no wide characters! */
4372                 t++;
4373                 n = strcspn(t, "\"");
4374                 if (t[n] == '"') {
4375                         if (n > 0) {
4376                                 data->dsize = n;
4377                                 data->dptr = talloc_memdup(mem_ctx, t, n);
4378                                 CTDB_NOMEM_ABORT(data->dptr);
4379                         }
4380                         ret = t + n + 1;
4381                 } else {
4382                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4383                 }
4384         } else {
4385                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4386         }
4387
4388         return ret;
4389 }
4390
4391 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4392                                  TDB_DATA *key, TDB_DATA *value)
4393 {
4394         char line [1024]; /* FIXME: make this more flexible? */
4395         const char *t;
4396         char *ptr;
4397
4398         ptr = fgets(line, sizeof(line), file);
4399
4400         if (ptr == NULL) {
4401                 return false;
4402         }
4403
4404         /* Get key */
4405         t = ptrans_parse_string(mem_ctx, line, key);
4406         if (t == NULL || key->dptr == NULL) {
4407                 /* Line Ignored but not EOF */
4408                 return true;
4409         }
4410
4411         /* Get value */
4412         t = ptrans_parse_string(mem_ctx, t, value);
4413         if (t == NULL) {
4414                 /* Line Ignored but not EOF */
4415                 talloc_free(key->dptr);
4416                 *key = tdb_null;
4417                 return true;
4418         }
4419
4420         return true;
4421 }
4422
4423 /*
4424  * Update a persistent database as per file/stdin
4425  */
4426 static int control_ptrans(struct ctdb_context *ctdb,
4427                           int argc, const char **argv)
4428 {
4429         const char *db_name;
4430         struct ctdb_db_context *ctdb_db;
4431         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4432         struct ctdb_transaction_handle *h;
4433         TDB_DATA key, value;
4434         FILE *file;
4435         int ret;
4436
4437         if (argc < 1) {
4438                 talloc_free(tmp_ctx);
4439                 usage();
4440         }
4441
4442         file = stdin;
4443         if (argc == 2) {
4444                 file = fopen(argv[1], "r");
4445                 if (file == NULL) {
4446                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4447                         talloc_free(tmp_ctx);
4448                         return -1;
4449                 }
4450         }
4451
4452         db_name = argv[0];
4453
4454         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4455         if (ctdb_db == NULL) {
4456                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4457                 goto error;
4458         }
4459
4460         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4461         if (h == NULL) {
4462                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4463                 goto error;
4464         }
4465
4466         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4467                 if (key.dsize != 0) {
4468                         ret = ctdb_transaction_store(h, key, value);
4469                         /* Minimise memory use */
4470                         talloc_free(key.dptr);
4471                         if (value.dptr != NULL) {
4472                                 talloc_free(value.dptr);
4473                         }
4474                         if (ret != 0) {
4475                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4476                                 ctdb_transaction_cancel(h);
4477                                 goto error;
4478                         }
4479                 }
4480         }
4481
4482         ret = ctdb_transaction_commit(h);
4483         if (ret != 0) {
4484                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4485                 goto error;
4486         }
4487
4488         if (file != stdin) {
4489                 fclose(file);
4490         }
4491         talloc_free(tmp_ctx);
4492         return 0;
4493
4494 error:
4495         if (file != stdin) {
4496                 fclose(file);
4497         }
4498
4499         talloc_free(tmp_ctx);
4500         return -1;
4501 }
4502
4503 /*
4504   check if a service is bound to a port or not
4505  */
4506 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4507 {
4508         int s, ret;
4509         int v;
4510         int port;
4511         struct sockaddr_in sin;
4512
4513         if (argc != 1) {
4514                 printf("Use: ctdb chktcport <port>\n");
4515                 return EINVAL;
4516         }
4517
4518         port = atoi(argv[0]);
4519
4520         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4521         if (s == -1) {
4522                 printf("Failed to open local socket\n");
4523                 return errno;
4524         }
4525
4526         v = fcntl(s, F_GETFL, 0);
4527         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4528                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4529         }
4530
4531         bzero(&sin, sizeof(sin));
4532         sin.sin_family = PF_INET;
4533         sin.sin_port   = htons(port);
4534         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4535         close(s);
4536         if (ret == -1) {
4537                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4538                 return errno;
4539         }
4540
4541         return 0;
4542 }
4543
4544
4545 /* Reload public IPs on a specified nodes */
4546 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4547 {
4548         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4549         uint32_t *nodes;
4550         uint32_t pnn_mode;
4551         uint32_t timeout;
4552         int ret;
4553
4554         assert_single_node_only();
4555
4556         if (argc > 1) {
4557                 usage();
4558         }
4559
4560         /* Determine the nodes where IPs need to be reloaded */
4561         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4562                               options.pnn, true, &nodes, &pnn_mode)) {
4563                 ret = -1;
4564                 goto done;
4565         }
4566
4567 again:
4568         /* Disable takeover runs on all connected nodes.  A reply
4569          * indicating success is needed from each node so all nodes
4570          * will need to be active.  This will retry until maxruntime
4571          * is exceeded, hence no error handling.
4572          * 
4573          * A check could be added to not allow reloading of IPs when
4574          * there are disconnected nodes.  However, this should
4575          * probably be left up to the administrator.
4576          */
4577         timeout = LONGTIMEOUT;
4578         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4579                         "Disable takeover runs", true);
4580
4581         /* Now tell all the desired nodes to reload their public IPs.
4582          * Keep trying this until it succeeds.  This assumes all
4583          * failures are transient, which might not be true...
4584          */
4585         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4586                                       nodes, 0, LONGTIMELIMIT(),
4587                                       false, tdb_null,
4588                                       NULL, NULL, NULL) != 0) {
4589                 DEBUG(DEBUG_ERR,
4590                       ("Unable to reload IPs on some nodes, try again.\n"));
4591                 goto again;
4592         }
4593
4594         /* It isn't strictly necessary to wait until takeover runs are
4595          * re-enabled but doing so can't hurt.
4596          */
4597         timeout = 0;
4598         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4599                         "Enable takeover runs", true);
4600
4601         ipreallocate(ctdb);
4602
4603         ret = 0;
4604 done:
4605         talloc_free(tmp_ctx);
4606         return ret;
4607 }
4608
4609 /*
4610   display a list of the databases on a remote ctdb
4611  */
4612 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4613 {
4614         int i, ret;
4615         struct ctdb_dbid_map *dbmap=NULL;
4616
4617         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4618         if (ret != 0) {
4619                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4620                 return ret;
4621         }
4622
4623         if(options.machinereadable){
4624                 printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4625                 for(i=0;i<dbmap->num;i++){
4626                         const char *path;
4627                         const char *name;
4628                         const char *health;
4629                         bool persistent;
4630                         bool readonly;
4631                         bool sticky;
4632
4633                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4634                                             dbmap->dbs[i].dbid, ctdb, &path);
4635                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4636                                             dbmap->dbs[i].dbid, ctdb, &name);
4637                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4638                                               dbmap->dbs[i].dbid, ctdb, &health);
4639                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4640                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4641                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4642                         printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4643                                dbmap->dbs[i].dbid, name, path,
4644                                !!(persistent), !!(sticky),
4645                                !!(health), !!(readonly));
4646                 }
4647                 return 0;
4648         }
4649
4650         printf("Number of databases:%d\n", dbmap->num);
4651         for(i=0;i<dbmap->num;i++){
4652                 const char *path;
4653                 const char *name;
4654                 const char *health;
4655                 bool persistent;
4656                 bool readonly;
4657                 bool sticky;
4658
4659                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4660                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4661                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4662                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4663                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4664                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4665                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4666                        dbmap->dbs[i].dbid, name, path,
4667                        persistent?" PERSISTENT":"",
4668                        sticky?" STICKY":"",
4669                        readonly?" READONLY":"",
4670                        health?" UNHEALTHY":"");
4671         }
4672
4673         return 0;
4674 }
4675
4676 /*
4677   display the status of a database on a remote ctdb
4678  */
4679 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4680 {
4681         const char *db_name;
4682         uint32_t db_id;
4683         uint8_t flags;
4684         const char *path;
4685         const char *health;
4686
4687         if (argc < 1) {
4688                 usage();
4689         }
4690
4691         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4692                 return -1;
4693         }
4694
4695         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4696         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4697         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4698                db_id, db_name, path,
4699                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4700                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4701                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4702                (health ? health : "OK"));
4703
4704         return 0;
4705 }
4706
4707 /*
4708   check if the local node is recmaster or not
4709   it will return 1 if this node is the recmaster and 0 if it is not
4710   or if the local ctdb daemon could not be contacted
4711  */
4712 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4713 {
4714         uint32_t mypnn, recmaster;
4715         int ret;
4716
4717         assert_single_node_only();
4718
4719         mypnn = getpnn(ctdb);
4720
4721         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4722         if (ret != 0) {
4723                 printf("Failed to get the recmaster\n");
4724                 return 1;
4725         }
4726
4727         if (recmaster != mypnn) {
4728                 printf("this node is not the recmaster\n");
4729                 return 1;
4730         }
4731
4732         printf("this node is the recmaster\n");
4733         return 0;
4734 }
4735
4736 /*
4737   ping a node
4738  */
4739 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4740 {
4741         int ret;
4742         struct timeval tv = timeval_current();
4743         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4744         if (ret == -1) {
4745                 printf("Unable to get ping response from node %u\n", options.pnn);
4746                 return -1;
4747         } else {
4748                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4749                        options.pnn, timeval_elapsed(&tv), ret);
4750         }
4751         return 0;
4752 }
4753
4754
4755 /*
4756   get a node's runstate
4757  */
4758 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4759 {
4760         int ret;
4761         enum ctdb_runstate runstate;
4762
4763         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4764         if (ret == -1) {
4765                 printf("Unable to get runstate response from node %u\n",
4766                        options.pnn);
4767                 return -1;
4768         } else {
4769                 bool found = true;
4770                 enum ctdb_runstate t;
4771                 int i;
4772                 for (i=0; i<argc; i++) {
4773                         found = false;
4774                         t = runstate_from_string(argv[i]);
4775                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4776                                 printf("Invalid run state (%s)\n", argv[i]);
4777                                 return -1;
4778                         }
4779
4780                         if (t == runstate) {
4781                                 found = true;
4782                                 break;
4783                         }
4784                 }
4785
4786                 if (!found) {
4787                         printf("CTDB not in required run state (got %s)\n", 
4788                                runstate_to_string((enum ctdb_runstate)runstate));
4789                         return -1;
4790                 }
4791         }
4792
4793         printf("%s\n", runstate_to_string(runstate));
4794         return 0;
4795 }
4796
4797
4798 /*
4799   get a tunable
4800  */
4801 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4802 {
4803         const char *name;
4804         uint32_t value;
4805         int ret;
4806
4807         if (argc < 1) {
4808                 usage();
4809         }
4810
4811         name = argv[0];
4812         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4813         if (ret != 0) {
4814                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4815                 return -1;
4816         }
4817
4818         printf("%-23s = %u\n", name, value);
4819         return 0;
4820 }
4821
4822 /*
4823   set a tunable
4824  */
4825 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4826 {
4827         const char *name;
4828         uint32_t value;
4829         int ret;
4830
4831         if (argc < 2) {
4832                 usage();
4833         }
4834
4835         name = argv[0];
4836         value = strtoul(argv[1], NULL, 0);
4837
4838         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4839         if (ret == -1) {
4840                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4841                 return -1;
4842         }
4843         if (ret == 1) {
4844                 DEBUG(DEBUG_WARNING,
4845                       ("Setting obsolete tunable variable '%s'\n",
4846                        name));
4847         }
4848         return 0;
4849 }
4850
4851 /*
4852   list all tunables
4853  */
4854 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4855 {
4856         uint32_t count;
4857         const char **list;
4858         int ret, i;
4859
4860         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4861         if (ret == -1) {
4862                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4863                 return -1;
4864         }
4865
4866         for (i=0;i<count;i++) {
4867                 control_getvar(ctdb, 1, &list[i]);
4868         }
4869
4870         talloc_free(list);
4871         
4872         return 0;
4873 }
4874
4875 /*
4876   display debug level on a node
4877  */
4878 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4879 {
4880         int ret;
4881         int32_t level;
4882
4883         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4884         if (ret != 0) {
4885                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4886                 return ret;
4887         } else {
4888                 const char *desc = get_debug_by_level(level);
4889                 if (desc == NULL) {
4890                         /* This should never happen */
4891                         desc = "Unknown";
4892                 }
4893                 if (options.machinereadable){
4894                         printm(":Name:Level:\n");
4895                         printm(":%s:%d:\n", desc, level);
4896                 } else {
4897                         printf("Node %u is at debug level %s (%d)\n",
4898                                options.pnn, desc, level);
4899                 }
4900         }
4901         return 0;
4902 }
4903
4904 /*
4905   display reclock file of a node
4906  */
4907 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4908 {
4909         int ret;
4910         const char *reclock;
4911
4912         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4913         if (ret != 0) {
4914                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4915                 return ret;
4916         } else {
4917                 if (options.machinereadable){
4918                         if (reclock != NULL) {
4919                                 printm("%s", reclock);
4920                         }
4921                 } else {
4922                         if (reclock == NULL) {
4923                                 printf("No reclock file used.\n");
4924                         } else {
4925                                 printf("Reclock file:%s\n", reclock);
4926                         }
4927                 }
4928         }
4929         return 0;
4930 }
4931
4932 /*
4933   set the reclock file of a node
4934  */
4935 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4936 {
4937         int ret;
4938         const char *reclock;
4939
4940         if (argc == 0) {
4941                 reclock = NULL;
4942         } else if (argc == 1) {
4943                 reclock = argv[0];
4944         } else {
4945                 usage();
4946         }
4947
4948         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4949         if (ret != 0) {
4950                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4951                 return ret;
4952         }
4953         return 0;
4954 }
4955
4956 /*
4957   set the natgw state on/off
4958  */
4959 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4960 {
4961         int ret;
4962         uint32_t natgwstate;
4963
4964         if (argc == 0) {
4965                 usage();
4966         }
4967
4968         if (!strcmp(argv[0], "on")) {
4969                 natgwstate = 1;
4970         } else if (!strcmp(argv[0], "off")) {
4971                 natgwstate = 0;
4972         } else {
4973                 usage();
4974         }
4975
4976         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4977         if (ret != 0) {
4978                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4979                 return ret;
4980         }
4981
4982         return 0;
4983 }
4984
4985 /*
4986   set the lmaster role on/off
4987  */
4988 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4989 {
4990         int ret;
4991         uint32_t lmasterrole;
4992
4993         if (argc == 0) {
4994                 usage();
4995         }
4996
4997         if (!strcmp(argv[0], "on")) {
4998                 lmasterrole = 1;
4999         } else if (!strcmp(argv[0], "off")) {
5000                 lmasterrole = 0;
5001         } else {
5002                 usage();
5003         }
5004
5005         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
5006         if (ret != 0) {
5007                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
5008                 return ret;
5009         }
5010
5011         return 0;
5012 }
5013
5014 /*
5015   set the recmaster role on/off
5016  */
5017 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5018 {
5019         int ret;
5020         uint32_t recmasterrole;
5021
5022         if (argc == 0) {
5023                 usage();
5024         }
5025
5026         if (!strcmp(argv[0], "on")) {
5027                 recmasterrole = 1;
5028         } else if (!strcmp(argv[0], "off")) {
5029                 recmasterrole = 0;
5030         } else {
5031                 usage();
5032         }
5033
5034         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5035         if (ret != 0) {
5036                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5037                 return ret;
5038         }
5039
5040         return 0;
5041 }
5042
5043 /*
5044   set debug level on a node or all nodes
5045  */
5046 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5047 {
5048         int ret;
5049         int32_t level;
5050
5051         if (argc == 0) {
5052                 printf("You must specify the debug level. Valid levels are:\n");
5053                 print_debug_levels(stdout);
5054                 return 0;
5055         }
5056
5057         if (!parse_debug(argv[0], &level)) {
5058                 printf("Invalid debug level, must be one of\n");
5059                 print_debug_levels(stdout);
5060                 return -1;
5061         }
5062
5063         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5064         if (ret != 0) {
5065                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5066         }
5067         return 0;
5068 }
5069
5070
5071 /*
5072   thaw a node
5073  */
5074 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5075 {
5076         int ret;
5077         uint32_t priority;
5078         
5079         if (argc == 1) {
5080                 priority = strtol(argv[0], NULL, 0);
5081         } else {
5082                 priority = 0;
5083         }
5084         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5085
5086         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5087         if (ret != 0) {
5088                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5089         }               
5090         return 0;
5091 }
5092
5093
5094 /*
5095   attach to a database
5096  */
5097 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5098 {
5099         const char *db_name;
5100         struct ctdb_db_context *ctdb_db;
5101         bool persistent = false;
5102
5103         if (argc < 1) {
5104                 usage();
5105         }
5106         db_name = argv[0];
5107         if (argc > 2) {
5108                 usage();
5109         }
5110         if (argc == 2) {
5111                 if (strcmp(argv[1], "persistent") != 0) {
5112                         usage();
5113                 }
5114                 persistent = true;
5115         }
5116
5117         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5118         if (ctdb_db == NULL) {
5119                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5120                 return -1;
5121         }
5122
5123         return 0;
5124 }
5125
5126 /*
5127  * detach from a database
5128  */
5129 static int control_detach(struct ctdb_context *ctdb, int argc,
5130                           const char **argv)
5131 {
5132         uint32_t db_id;
5133         uint8_t flags;
5134         int ret, i, status = 0;
5135         struct ctdb_node_map *nodemap = NULL;
5136         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5137         uint32_t recmode;
5138
5139         if (argc < 1) {
5140                 usage();
5141         }
5142
5143         assert_single_node_only();
5144
5145         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
5146                                     &recmode);
5147         if (ret != 0) {
5148                 DEBUG(DEBUG_ERR, ("Database cannot be detached "
5149                                   "when recovery is active\n"));
5150                 talloc_free(tmp_ctx);
5151                 return -1;
5152         }
5153
5154         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5155                                    &nodemap);
5156         if (ret != 0) {
5157                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5158                                   options.pnn));
5159                 talloc_free(tmp_ctx);
5160                 return -1;
5161         }
5162
5163         for (i=0; i<nodemap->num; i++) {
5164                 uint32_t value;
5165
5166                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
5167                         continue;
5168                 }
5169
5170                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
5171                         continue;
5172                 }
5173
5174                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
5175                         DEBUG(DEBUG_ERR, ("Database cannot be detached on "
5176                                           "inactive (stopped or banned) node "
5177                                           "%u\n", nodemap->nodes[i].pnn));
5178                         talloc_free(tmp_ctx);
5179                         return -1;
5180                 }
5181
5182                 ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
5183                                             nodemap->nodes[i].pnn,
5184                                             "AllowClientDBAttach",
5185                                             &value);
5186                 if (ret != 0) {
5187                         DEBUG(DEBUG_ERR, ("Unable to get tunable "
5188                                           "AllowClientDBAttach from node %u\n",
5189                                            nodemap->nodes[i].pnn));
5190                         talloc_free(tmp_ctx);
5191                         return -1;
5192                 }
5193
5194                 if (value == 1) {
5195                         DEBUG(DEBUG_ERR, ("Database access is still active on "
5196                                           "node %u. Set AllowClientDBAttach=0 "
5197                                           "on all nodes.\n",
5198                                           nodemap->nodes[i].pnn));
5199                         talloc_free(tmp_ctx);
5200                         return -1;
5201                 }
5202         }
5203
5204         talloc_free(tmp_ctx);
5205
5206         for (i=0; i<argc; i++) {
5207                 if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
5208                         continue;
5209                 }
5210
5211                 if (flags & CTDB_DB_FLAGS_PERSISTENT) {
5212                         DEBUG(DEBUG_ERR, ("Persistent database '%s' "
5213                                           "cannot be detached\n", argv[i]));
5214                         status = -1;
5215                         continue;
5216                 }
5217
5218                 ret = ctdb_detach(ctdb, db_id);
5219                 if (ret != 0) {
5220                         DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
5221                                           argv[i]));
5222                         status = ret;
5223                 }
5224         }
5225
5226         return status;
5227 }
5228
5229 /*
5230   set db priority
5231  */
5232 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5233 {
5234         struct ctdb_db_priority db_prio;
5235         int ret;
5236
5237         if (argc < 2) {
5238                 usage();
5239         }
5240
5241         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5242         db_prio.priority = strtoul(argv[1], NULL, 0);
5243
5244         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5245         if (ret != 0) {
5246                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5247                 return -1;
5248         }
5249
5250         return 0;
5251 }
5252
5253 /*
5254   get db priority
5255  */
5256 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5257 {
5258         uint32_t db_id, priority;
5259         int ret;
5260
5261         if (argc < 1) {
5262                 usage();
5263         }
5264
5265         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5266                 return -1;
5267         }
5268
5269         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5270         if (ret != 0) {
5271                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5272                 return -1;
5273         }
5274
5275         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5276
5277         return 0;
5278 }
5279
5280 /*
5281   set the sticky records capability for a database
5282  */
5283 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5284 {
5285         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5286         uint32_t db_id;
5287         int ret;
5288
5289         if (argc < 1) {
5290                 usage();
5291         }
5292
5293         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5294                 return -1;
5295         }
5296
5297         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5298         if (ret != 0) {
5299                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5300                 talloc_free(tmp_ctx);
5301                 return -1;
5302         }
5303
5304         talloc_free(tmp_ctx);
5305         return 0;
5306 }
5307
5308 /*
5309   set the readonly capability for a database
5310  */
5311 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5312 {
5313         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5314         uint32_t db_id;
5315         int ret;
5316
5317         if (argc < 1) {
5318                 usage();
5319         }
5320
5321         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5322                 return -1;
5323         }
5324
5325         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5326         if (ret != 0) {
5327                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5328                 talloc_free(tmp_ctx);
5329                 return -1;
5330         }
5331
5332         talloc_free(tmp_ctx);
5333         return 0;
5334 }
5335
5336 /*
5337   get db seqnum
5338  */
5339 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5340 {
5341         uint32_t db_id;
5342         uint64_t seqnum;
5343         int ret;
5344
5345         if (argc < 1) {
5346                 usage();
5347         }
5348
5349         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5350                 return -1;
5351         }
5352
5353         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5354         if (ret != 0) {
5355                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5356                 return -1;
5357         }
5358
5359         printf("Sequence number:%lld\n", (long long)seqnum);
5360
5361         return 0;
5362 }
5363
5364 /*
5365   run an eventscript on a node
5366  */
5367 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5368 {
5369         TDB_DATA data;
5370         int ret;
5371         int32_t res;
5372         char *errmsg;
5373         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5374
5375         if (argc != 1) {
5376                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5377                 return -1;
5378         }
5379
5380         data.dptr = (unsigned char *)discard_const(argv[0]);
5381         data.dsize = strlen((char *)data.dptr) + 1;
5382
5383         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5384
5385         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5386                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5387         if (ret != 0 || res != 0) {
5388                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5389                 talloc_free(tmp_ctx);
5390                 return -1;
5391         }
5392         talloc_free(tmp_ctx);
5393         return 0;
5394 }
5395
5396 #define DB_VERSION 1
5397 #define MAX_DB_NAME 64
5398 struct db_file_header {
5399         unsigned long version;
5400         time_t timestamp;
5401         unsigned long persistent;
5402         unsigned long size;
5403         const char name[MAX_DB_NAME];
5404 };
5405
5406 struct backup_data {
5407         struct ctdb_marshall_buffer *records;
5408         uint32_t len;
5409         uint32_t total;
5410         bool traverse_error;
5411 };
5412
5413 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5414 {
5415         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5416         struct ctdb_rec_data *rec;
5417
5418         /* add the record */
5419         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5420         if (rec == NULL) {
5421                 bd->traverse_error = true;
5422                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5423                 return -1;
5424         }
5425         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5426         if (bd->records == NULL) {
5427                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5428                 bd->traverse_error = true;
5429                 return -1;
5430         }
5431         bd->records->count++;
5432         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5433         bd->len += rec->length;
5434         talloc_free(rec);
5435
5436         bd->total++;
5437         return 0;
5438 }
5439
5440 /*
5441  * backup a database to a file 
5442  */
5443 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5444 {
5445         const char *db_name;
5446         int ret;
5447         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5448         struct db_file_header dbhdr;
5449         struct ctdb_db_context *ctdb_db;
5450         struct backup_data *bd;
5451         int fh = -1;
5452         int status = -1;
5453         const char *reason = NULL;
5454         uint32_t db_id;
5455         uint8_t flags;
5456
5457         assert_single_node_only();
5458
5459         if (argc != 2) {
5460                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5461                 return -1;
5462         }
5463
5464         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5465                 return -1;
5466         }
5467
5468         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5469                                     db_id, tmp_ctx, &reason);
5470         if (ret != 0) {
5471                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5472                                  argv[0]));
5473                 talloc_free(tmp_ctx);
5474                 return -1;
5475         }
5476         if (reason) {
5477                 uint32_t allow_unhealthy = 0;
5478
5479                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5480                                       "AllowUnhealthyDBRead",
5481                                       &allow_unhealthy);
5482
5483                 if (allow_unhealthy != 1) {
5484                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5485                                          argv[0], reason));
5486
5487                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5488                                          allow_unhealthy));
5489                         talloc_free(tmp_ctx);
5490                         return -1;
5491                 }
5492
5493                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5494                                      argv[0], argv[0]));
5495                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5496                                      "tunnable AllowUnhealthyDBRead = %u\n",
5497                                      allow_unhealthy));
5498         }
5499
5500         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5501         if (ctdb_db == NULL) {
5502                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5503                 talloc_free(tmp_ctx);
5504                 return -1;
5505         }
5506
5507
5508         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5509         if (ret == -1) {
5510                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5511                 talloc_free(tmp_ctx);
5512                 return -1;
5513         }
5514
5515
5516         bd = talloc_zero(tmp_ctx, struct backup_data);
5517         if (bd == NULL) {
5518                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5519                 talloc_free(tmp_ctx);
5520                 return -1;
5521         }
5522
5523         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5524         if (bd->records == NULL) {
5525                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5526                 talloc_free(tmp_ctx);
5527                 return -1;
5528         }
5529
5530         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5531         bd->records->db_id = ctdb_db->db_id;
5532         /* traverse the database collecting all records */
5533         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5534             bd->traverse_error) {
5535                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5536                 talloc_free(tmp_ctx);
5537                 return -1;              
5538         }
5539
5540         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5541
5542
5543         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5544         if (fh == -1) {
5545                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5546                 talloc_free(tmp_ctx);
5547                 return -1;
5548         }
5549
5550         ZERO_STRUCT(dbhdr);
5551         dbhdr.version = DB_VERSION;
5552         dbhdr.timestamp = time(NULL);
5553         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5554         dbhdr.size = bd->len;
5555         if (strlen(argv[0]) >= MAX_DB_NAME) {
5556                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5557                 goto done;
5558         }
5559         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5560         ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
5561         if (ret == -1) {
5562                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5563                 goto done;
5564         }
5565         ret = sys_write(fh, bd->records, bd->len);
5566         if (ret == -1) {
5567                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5568                 goto done;
5569         }
5570
5571         status = 0;
5572 done:
5573         if (fh != -1) {
5574                 ret = close(fh);
5575                 if (ret == -1) {
5576                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5577                 }
5578         }
5579
5580         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5581
5582         talloc_free(tmp_ctx);
5583         return status;
5584 }
5585
5586 /*
5587  * restore a database from a file 
5588  */
5589 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5590 {
5591         int ret;
5592         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5593         TDB_DATA outdata;
5594         TDB_DATA data;
5595         struct db_file_header dbhdr;
5596         struct ctdb_db_context *ctdb_db;
5597         struct ctdb_node_map *nodemap=NULL;
5598         struct ctdb_vnn_map *vnnmap=NULL;
5599         int i, fh;
5600         struct ctdb_control_wipe_database w;
5601         uint32_t *nodes;
5602         uint32_t generation;
5603         struct tm *tm;
5604         char tbuf[100];
5605         char *dbname;
5606
5607         assert_single_node_only();
5608
5609         if (argc < 1 || argc > 2) {
5610                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5611                 return -1;
5612         }
5613
5614         fh = open(argv[0], O_RDONLY);
5615         if (fh == -1) {
5616                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5617                 talloc_free(tmp_ctx);
5618                 return -1;
5619         }
5620
5621         sys_read(fh, &dbhdr, sizeof(dbhdr));
5622         if (dbhdr.version != DB_VERSION) {
5623                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5624                 close(fh);
5625                 talloc_free(tmp_ctx);
5626                 return -1;
5627         }
5628
5629         dbname = discard_const(dbhdr.name);
5630         if (argc == 2) {
5631                 dbname = discard_const(argv[1]);
5632         }
5633
5634         outdata.dsize = dbhdr.size;
5635         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5636         if (outdata.dptr == NULL) {
5637                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5638                 close(fh);
5639                 talloc_free(tmp_ctx);
5640                 return -1;
5641         }               
5642         sys_read(fh, outdata.dptr, outdata.dsize);
5643         close(fh);
5644
5645         tm = localtime(&dbhdr.timestamp);
5646         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5647         printf("Restoring database '%s' from backup @ %s\n",
5648                 dbname, tbuf);
5649
5650
5651         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5652         if (ctdb_db == NULL) {
5653                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5654                 talloc_free(tmp_ctx);
5655                 return -1;
5656         }
5657
5658         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5659         if (ret != 0) {
5660                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5661                 talloc_free(tmp_ctx);
5662                 return ret;
5663         }
5664
5665
5666         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5667         if (ret != 0) {
5668                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5669                 talloc_free(tmp_ctx);
5670                 return ret;
5671         }
5672
5673         /* freeze all nodes */
5674         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5675         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5676                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5677                                         nodes, i,
5678                                         TIMELIMIT(),
5679                                         false, tdb_null,
5680                                         NULL, NULL,
5681                                         NULL) != 0) {
5682                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5683                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5684                         talloc_free(tmp_ctx);
5685                         return -1;
5686                 }
5687         }
5688
5689         generation = vnnmap->generation;
5690         data.dptr = (void *)&generation;
5691         data.dsize = sizeof(generation);
5692
5693         /* start a cluster wide transaction */
5694         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5695         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5696                                         nodes, 0,
5697                                         TIMELIMIT(), false, data,
5698                                         NULL, NULL,
5699                                         NULL) != 0) {
5700                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5701                 return -1;
5702         }
5703
5704
5705         w.db_id = ctdb_db->db_id;
5706         w.transaction_id = generation;
5707
5708         data.dptr = (void *)&w;
5709         data.dsize = sizeof(w);
5710
5711         /* wipe all the remote databases. */
5712         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5713         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5714                                         nodes, 0,
5715                                         TIMELIMIT(), false, data,
5716                                         NULL, NULL,
5717                                         NULL) != 0) {
5718                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5719                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5720                 talloc_free(tmp_ctx);
5721                 return -1;
5722         }
5723         
5724         /* push the database */
5725         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5726         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5727                                         nodes, 0,
5728                                         TIMELIMIT(), false, outdata,
5729                                         NULL, NULL,
5730                                         NULL) != 0) {
5731                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5732                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5733                 talloc_free(tmp_ctx);
5734                 return -1;
5735         }
5736
5737         data.dptr = (void *)&ctdb_db->db_id;
5738         data.dsize = sizeof(ctdb_db->db_id);
5739
5740         /* mark the database as healthy */
5741         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5742         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5743                                         nodes, 0,
5744                                         TIMELIMIT(), false, data,
5745                                         NULL, NULL,
5746                                         NULL) != 0) {
5747                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5748                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5749                 talloc_free(tmp_ctx);
5750                 return -1;
5751         }
5752
5753         data.dptr = (void *)&generation;
5754         data.dsize = sizeof(generation);
5755
5756         /* commit all the changes */
5757         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5758                                         nodes, 0,
5759                                         TIMELIMIT(), false, data,
5760                                         NULL, NULL,
5761                                         NULL) != 0) {
5762                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5763                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5764                 talloc_free(tmp_ctx);
5765                 return -1;
5766         }
5767
5768
5769         /* thaw all nodes */
5770         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5771         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5772                                         nodes, 0,
5773                                         TIMELIMIT(),
5774                                         false, tdb_null,
5775                                         NULL, NULL,
5776                                         NULL) != 0) {
5777                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5778                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5779                 talloc_free(tmp_ctx);
5780                 return -1;
5781         }
5782
5783
5784         talloc_free(tmp_ctx);
5785         return 0;
5786 }
5787
5788 /*
5789  * dump a database backup from a file
5790  */
5791 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5792 {
5793         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5794         TDB_DATA outdata;
5795         struct db_file_header dbhdr;
5796         int i, fh;
5797         struct tm *tm;
5798         char tbuf[100];
5799         struct ctdb_rec_data *rec = NULL;
5800         struct ctdb_marshall_buffer *m;
5801         struct ctdb_dump_db_context c;
5802
5803         assert_single_node_only();
5804
5805         if (argc != 1) {
5806                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5807                 return -1;
5808         }
5809
5810         fh = open(argv[0], O_RDONLY);
5811         if (fh == -1) {
5812                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5813                 talloc_free(tmp_ctx);
5814                 return -1;
5815         }
5816
5817         sys_read(fh, &dbhdr, sizeof(dbhdr));
5818         if (dbhdr.version != DB_VERSION) {
5819                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5820                 close(fh);
5821                 talloc_free(tmp_ctx);
5822                 return -1;
5823         }
5824
5825         outdata.dsize = dbhdr.size;
5826         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5827         if (outdata.dptr == NULL) {
5828                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5829                 close(fh);
5830                 talloc_free(tmp_ctx);
5831                 return -1;
5832         }
5833         sys_read(fh, outdata.dptr, outdata.dsize);
5834         close(fh);
5835         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5836
5837         tm = localtime(&dbhdr.timestamp);
5838         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5839         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5840                 dbhdr.name, m->db_id, tbuf);
5841
5842         ZERO_STRUCT(c);
5843         c.f = stdout;
5844         c.printemptyrecords = (bool)options.printemptyrecords;
5845         c.printdatasize = (bool)options.printdatasize;
5846         c.printlmaster = false;
5847         c.printhash = (bool)options.printhash;
5848         c.printrecordflags = (bool)options.printrecordflags;
5849
5850         for (i=0; i < m->count; i++) {
5851                 uint32_t reqid = 0;
5852                 TDB_DATA key, data;
5853
5854                 /* we do not want the header splitted, so we pass NULL*/
5855                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5856                                               NULL, &key, &data);
5857
5858                 ctdb_dumpdb_record(ctdb, key, data, &c);
5859         }
5860
5861         printf("Dumped %d records\n", i);
5862         talloc_free(tmp_ctx);
5863         return 0;
5864 }
5865
5866 /*
5867  * wipe a database from a file
5868  */
5869 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5870                           const char **argv)
5871 {
5872         const char *db_name;
5873         int ret;
5874         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5875         TDB_DATA data;
5876         struct ctdb_db_context *ctdb_db;
5877         struct ctdb_node_map *nodemap = NULL;
5878         struct ctdb_vnn_map *vnnmap = NULL;
5879         int i;
5880         struct ctdb_control_wipe_database w;
5881         uint32_t *nodes;
5882         uint32_t generation;
5883         uint8_t flags;
5884
5885         assert_single_node_only();
5886
5887         if (argc != 1) {
5888                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5889                 return -1;
5890         }
5891
5892         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5893                 return -1;
5894         }
5895
5896         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5897         if (ctdb_db == NULL) {
5898                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5899                                   argv[0]));
5900                 talloc_free(tmp_ctx);
5901                 return -1;
5902         }
5903
5904         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5905                                    &nodemap);
5906         if (ret != 0) {
5907                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5908                                   options.pnn));
5909                 talloc_free(tmp_ctx);
5910                 return ret;
5911         }
5912
5913         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5914                                   &vnnmap);
5915         if (ret != 0) {
5916                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5917                                   options.pnn));
5918                 talloc_free(tmp_ctx);
5919                 return ret;
5920         }
5921
5922         /* freeze all nodes */
5923         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5924         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5925                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5926                                                 nodes, i,
5927                                                 TIMELIMIT(),
5928                                                 false, tdb_null,
5929                                                 NULL, NULL,
5930                                                 NULL);
5931                 if (ret != 0) {
5932                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5933                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5934                                              CTDB_RECOVERY_ACTIVE);
5935                         talloc_free(tmp_ctx);
5936                         return -1;
5937                 }
5938         }
5939
5940         generation = vnnmap->generation;
5941         data.dptr = (void *)&generation;
5942         data.dsize = sizeof(generation);
5943
5944         /* start a cluster wide transaction */
5945         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5946         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5947                                         nodes, 0,
5948                                         TIMELIMIT(), false, data,
5949                                         NULL, NULL,
5950                                         NULL);
5951         if (ret!= 0) {
5952                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5953                                   "transactions.\n"));
5954                 return -1;
5955         }
5956
5957         w.db_id = ctdb_db->db_id;
5958         w.transaction_id = generation;
5959
5960         data.dptr = (void *)&w;
5961         data.dsize = sizeof(w);
5962
5963         /* wipe all the remote databases. */
5964         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5965         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5966                                         nodes, 0,
5967                                         TIMELIMIT(), false, data,
5968                                         NULL, NULL,
5969                                         NULL) != 0) {
5970                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5971                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5972                 talloc_free(tmp_ctx);
5973                 return -1;
5974         }
5975
5976         data.dptr = (void *)&ctdb_db->db_id;
5977         data.dsize = sizeof(ctdb_db->db_id);
5978
5979         /* mark the database as healthy */
5980         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5981         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5982                                         nodes, 0,
5983                                         TIMELIMIT(), false, data,
5984                                         NULL, NULL,
5985                                         NULL) != 0) {
5986                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5987                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5988                 talloc_free(tmp_ctx);
5989                 return -1;
5990         }
5991
5992         data.dptr = (void *)&generation;
5993         data.dsize = sizeof(generation);
5994
5995         /* commit all the changes */
5996         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5997                                         nodes, 0,
5998                                         TIMELIMIT(), false, data,
5999                                         NULL, NULL,
6000                                         NULL) != 0) {
6001                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
6002                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6003                 talloc_free(tmp_ctx);
6004                 return -1;
6005         }
6006
6007         /* thaw all nodes */
6008         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
6009         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
6010                                         nodes, 0,
6011                                         TIMELIMIT(),
6012                                         false, tdb_null,
6013                                         NULL, NULL,
6014                                         NULL) != 0) {
6015                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
6016                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6017                 talloc_free(tmp_ctx);
6018                 return -1;
6019         }
6020
6021         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
6022
6023         talloc_free(tmp_ctx);
6024         return 0;
6025 }
6026
6027 /*
6028   dump memory usage
6029  */
6030 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6031 {
6032         TDB_DATA data;
6033         int ret;
6034         int32_t res;
6035         char *errmsg;
6036         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
6037         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
6038                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
6039         if (ret != 0 || res != 0) {
6040                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
6041                 talloc_free(tmp_ctx);
6042                 return -1;
6043         }
6044         sys_write(1, data.dptr, data.dsize);
6045         talloc_free(tmp_ctx);
6046         return 0;
6047 }
6048
6049 /*
6050   handler for memory dumps
6051 */
6052 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6053                              TDB_DATA data, void *private_data)
6054 {
6055         sys_write(1, data.dptr, data.dsize);
6056         exit(0);
6057 }
6058
6059 /*
6060   dump memory usage on the recovery daemon
6061  */
6062 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6063 {
6064         int ret;
6065         TDB_DATA data;
6066         struct srvid_request rd;
6067
6068         rd.pnn = ctdb_get_pnn(ctdb);
6069         rd.srvid = getpid();
6070
6071         /* register a message port for receiveing the reply so that we
6072            can receive the reply
6073         */
6074         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6075
6076
6077         data.dptr = (uint8_t *)&rd;
6078         data.dsize = sizeof(rd);
6079
6080         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6081         if (ret != 0) {
6082                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6083                 return -1;
6084         }
6085
6086         /* this loop will terminate when we have received the reply */
6087         while (1) {     
6088                 event_loop_once(ctdb->ev);
6089         }
6090
6091         return 0;
6092 }
6093
6094 /*
6095   send a message to a srvid
6096  */
6097 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6098 {
6099         unsigned long srvid;
6100         int ret;
6101         TDB_DATA data;
6102
6103         if (argc < 2) {
6104                 usage();
6105         }
6106
6107         srvid      = strtoul(argv[0], NULL, 0);
6108
6109         data.dptr = (uint8_t *)discard_const(argv[1]);
6110         data.dsize= strlen(argv[1]);
6111
6112         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6113         if (ret != 0) {
6114                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6115                 return -1;
6116         }
6117
6118         return 0;
6119 }
6120
6121 /*
6122   handler for msglisten
6123 */
6124 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6125                              TDB_DATA data, void *private_data)
6126 {
6127         int i;
6128
6129         printf("Message received: ");
6130         for (i=0;i<data.dsize;i++) {
6131                 printf("%c", data.dptr[i]);
6132         }
6133         printf("\n");
6134 }
6135
6136 /*
6137   listen for messages on a messageport
6138  */
6139 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6140 {
6141         uint64_t srvid;
6142
6143         srvid = getpid();
6144
6145         /* register a message port and listen for messages
6146         */
6147         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6148         printf("Listening for messages on srvid:%d\n", (int)srvid);
6149
6150         while (1) {     
6151                 event_loop_once(ctdb->ev);
6152         }
6153
6154         return 0;
6155 }
6156
6157 /*
6158   list all nodes in the cluster
6159   we parse the nodes file directly
6160  */
6161 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6162 {
6163         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6164         struct ctdb_node_map *node_map;
6165         int i;
6166
6167         assert_single_node_only();
6168
6169         node_map = read_nodes_file(mem_ctx);
6170         if (node_map == NULL) {
6171                 talloc_free(mem_ctx);
6172                 return -1;
6173         }
6174
6175         for (i = 0; i < node_map->num; i++) {
6176                 const char *addr;
6177
6178                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
6179                         continue;
6180                 }
6181                 addr = ctdb_addr_to_str(&node_map->nodes[i].addr);
6182                 if (options.machinereadable){
6183                         printm(":%d:%s:\n", node_map->nodes[i].pnn, addr);
6184                 } else {
6185                         printf("%s\n", addr);
6186                 }
6187         }
6188         talloc_free(mem_ctx);
6189
6190         return 0;
6191 }
6192
6193 /*
6194   reload the nodes file on the local node
6195  */
6196 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6197 {
6198         int i, ret;
6199         int mypnn;
6200         struct ctdb_node_map *nodemap=NULL;
6201
6202         assert_single_node_only();
6203
6204         mypnn = ctdb_get_pnn(ctdb);
6205
6206         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6207         if (ret != 0) {
6208                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6209                 return ret;
6210         }
6211
6212         /* reload the nodes file on all remote nodes */
6213         for (i=0;i<nodemap->num;i++) {
6214                 if (nodemap->nodes[i].pnn == mypnn) {
6215                         continue;
6216                 }
6217                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
6218                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
6219                         nodemap->nodes[i].pnn);
6220                 if (ret != 0) {
6221                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
6222                 }
6223         }
6224
6225         /* reload the nodes file on the local node */
6226         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
6227         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
6228         if (ret != 0) {
6229                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
6230         }
6231
6232         /* initiate a recovery */
6233         control_recover(ctdb, argc, argv);
6234
6235         return 0;
6236 }
6237
6238
6239 static const struct {
6240         const char *name;
6241         int (*fn)(struct ctdb_context *, int, const char **);
6242         bool auto_all;
6243         bool without_daemon; /* can be run without daemon running ? */
6244         const char *msg;
6245         const char *args;
6246 } ctdb_commands[] = {
6247         { "version",         control_version,           true,   true,   "show version of ctdb" },
6248         { "status",          control_status,            true,   false,  "show node status" },
6249         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6250         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6251         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6252         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6253         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6254         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6255         { "statistics",      control_statistics,        false,  false, "show statistics" },
6256         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6257         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6258         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6259         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6260         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6261         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6262         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6263         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6264         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6265         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6266         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6267         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6268         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6269         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6270         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6271         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6272         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6273         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6274         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6275         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6276         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6277         { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
6278         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6279         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6280         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6281         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6282         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6283         { "stop",            control_stop,              true,   false,  "stop a node" },
6284         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6285         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6286         { "unban",           control_unban,             true,   false,  "unban a node" },
6287         { "showban",         control_showban,           true,   false,  "show ban information"},
6288         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6289         { "recover",         control_recover,           true,   false,  "force recovery" },
6290         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6291         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6292         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6293         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6294         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6295         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6296         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6297         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6298         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6299
6300         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6301
6302         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6303         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6304         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6305         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6306         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6307         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6308         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6309         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6310         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6311         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6312         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6313         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6314         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6315         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6316         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6317         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6318         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6319         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6320         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6321         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6322         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6323         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6324         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6325         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6326         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6327         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6328         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6329         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6330         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6331         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6332         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6333         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6334         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6335         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6336         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6337         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6338         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6339         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6340         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6341         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6342         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
6343         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
6344         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6345         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6346         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6347         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6348         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6349         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6350         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6351 };
6352
6353 /*
6354   show usage message
6355  */
6356 static void usage(void)
6357 {
6358         int i;
6359         printf(
6360 "Usage: ctdb [options] <control>\n" \
6361 "Options:\n" \
6362 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6363 "   -Y                 generate machine readable output\n"
6364 "   -x <char>          specify delimiter for machine readable output\n"
6365 "   -v                 generate verbose output\n"
6366 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6367         printf("Controls:\n");
6368         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6369                 printf("  %-15s %-27s  %s\n", 
6370                        ctdb_commands[i].name, 
6371                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6372                        ctdb_commands[i].msg);
6373         }
6374         exit(1);
6375 }
6376
6377
6378 static void ctdb_alarm(int sig)
6379 {
6380         printf("Maximum runtime exceeded - exiting\n");
6381         _exit(ERR_TIMEOUT);
6382 }
6383
6384 /*
6385   main program
6386 */
6387 int main(int argc, const char *argv[])
6388 {
6389         struct ctdb_context *ctdb;
6390         char *nodestring = NULL;
6391         int machineparsable = 0;
6392         struct poptOption popt_options[] = {
6393                 POPT_AUTOHELP
6394                 POPT_CTDB_CMDLINE
6395                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6396                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6397                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
6398                 { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
6399                 { NULL, 'X', POPT_ARG_NONE, &machineparsable, 0, "enable machine parsable output with separator |", NULL },
6400                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6401                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6402                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6403                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6404                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6405                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6406                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6407                 POPT_TABLEEND
6408         };
6409         int opt;
6410         const char **extra_argv;
6411         int extra_argc = 0;
6412         int ret=-1, i;
6413         poptContext pc;
6414         struct event_context *ev;
6415         const char *control;
6416
6417         setlinebuf(stdout);
6418         
6419         /* set some defaults */
6420         options.maxruntime = 0;
6421         options.timelimit = 10;
6422         options.pnn = CTDB_CURRENT_NODE;
6423
6424         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6425
6426         while ((opt = poptGetNextOpt(pc)) != -1) {
6427                 switch (opt) {
6428                 default:
6429                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6430                                 poptBadOption(pc, 0), poptStrerror(opt)));
6431                         exit(1);
6432                 }
6433         }
6434
6435         /* setup the remaining options for the main program to use */
6436         extra_argv = poptGetArgs(pc);
6437         if (extra_argv) {
6438                 extra_argv++;
6439                 while (extra_argv[extra_argc]) extra_argc++;
6440         }
6441
6442         if (extra_argc < 1) {
6443                 usage();
6444         }
6445
6446         if (options.maxruntime == 0) {
6447                 const char *ctdb_timeout;
6448                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6449                 if (ctdb_timeout != NULL) {
6450                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6451                 } else {
6452                         /* default timeout is 120 seconds */
6453                         options.maxruntime = 120;
6454                 }
6455         }
6456
6457         if (machineparsable) {
6458                 options.machineseparator = "|";
6459         }
6460         if (options.machineseparator != NULL) {
6461                 if (strlen(options.machineseparator) != 1) {
6462                         printf("Invalid separator \"%s\" - "
6463                                "must be single character\n",
6464                                options.machineseparator);
6465                         exit(1);
6466                 }
6467
6468                 /* -x implies -Y */
6469                 options.machinereadable = true;
6470         } else if (options.machinereadable) {
6471                 options.machineseparator = ":";
6472         }
6473
6474         signal(SIGALRM, ctdb_alarm);
6475         alarm(options.maxruntime);
6476
6477         control = extra_argv[0];
6478
6479         /* Default value for CTDB_BASE - don't override */
6480         setenv("CTDB_BASE", CTDB_ETCDIR, 0);
6481
6482         ev = event_context_init(NULL);
6483         if (!ev) {
6484                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6485                 exit(1);
6486         }
6487
6488         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6489                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6490                         break;
6491                 }
6492         }
6493
6494         if (i == ARRAY_SIZE(ctdb_commands)) {
6495                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6496                 exit(1);
6497         }
6498
6499         if (ctdb_commands[i].without_daemon == true) {
6500                 if (nodestring != NULL) {
6501                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6502                         exit(1);
6503                 }
6504                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6505         }
6506
6507         /* initialise ctdb */
6508         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6509
6510         if (ctdb == NULL) {
6511                 uint32_t pnn;
6512                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6513
6514                 pnn = find_node_xpnn();
6515                 if (pnn == -1) {
6516                         DEBUG(DEBUG_ERR,
6517                               ("Is this node part of a CTDB cluster?\n"));
6518                 }
6519                 exit(1);
6520         }
6521
6522         /* setup the node number(s) to contact */
6523         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6524                               &options.nodes, &options.pnn)) {
6525                 usage();
6526         }
6527
6528         if (options.pnn == CTDB_CURRENT_NODE) {
6529                 options.pnn = options.nodes[0];
6530         }
6531
6532         if (ctdb_commands[i].auto_all && 
6533             ((options.pnn == CTDB_BROADCAST_ALL) ||
6534              (options.pnn == CTDB_MULTICAST))) {
6535                 int j;
6536
6537                 ret = 0;
6538                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6539                         options.pnn = options.nodes[j];
6540                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6541                 }
6542         } else {
6543                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6544         }
6545
6546         talloc_free(ctdb);
6547         talloc_free(ev);
6548         (void)poptFreeContext(pc);
6549
6550         return ret;
6551
6552 }