ctdb-build: Calculate correct version when building from tarball
[amitay/samba.git] / ctdb / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "replace.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26
27 #include <popt.h>
28 #include <talloc.h>
29 /* Allow use of deprecated function tevent_loop_allow_nesting() */
30 #define TEVENT_DEPRECATED
31 #include <tevent.h>
32 #include <tdb.h>
33
34 #include "lib/tdb_wrap/tdb_wrap.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/debug.h"
37 #include "lib/util/substitute.h"
38 #include "lib/util/time.h"
39
40 #include "ctdb_logging.h"
41 #include "ctdb_version.h"
42 #include "ctdb_private.h"
43 #include "ctdb_client.h"
44
45 #include "common/cmdline.h"
46 #include "common/rb_tree.h"
47 #include "common/system.h"
48
49 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
50 #define ERR_NONODE      21      /* node does not exist */
51 #define ERR_DISNODE     22      /* node is disconnected */
52
53 static void usage(void);
54
55 static struct {
56         int timelimit;
57         uint32_t pnn;
58         uint32_t *nodes;
59         int machinereadable;
60         const char *machineseparator;
61         int verbose;
62         int maxruntime;
63         int printemptyrecords;
64         int printdatasize;
65         int printlmaster;
66         int printhash;
67         int printrecordflags;
68 } options;
69
70 #define LONGTIMEOUT options.timelimit*10
71
72 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
73 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
74
75 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
76 {
77         return (tv2->tv_sec - tv->tv_sec) +
78                 (tv2->tv_usec - tv->tv_usec)*1.0e-6;
79 }
80
81 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
82 {
83         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
84         return 0;
85 }
86
87 /* Like printf(3) but substitute for separator in format */
88 static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
89 static int printm(const char *format, ...)
90 {
91         va_list ap;
92         int ret;
93         size_t len = strlen(format);
94         char new_format[len+1];
95
96         strcpy(new_format, format);
97
98         if (options.machineseparator[0] != ':') {
99                 all_string_sub(new_format,
100                                ":", options.machineseparator, len + 1);
101         }
102
103         va_start(ap, format);
104         ret = vprintf(new_format, ap);
105         va_end(ap);
106
107         return ret;
108 }
109
110 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
111                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
112                                    "Out of memory in " __location__ )); \
113                 abort();                                                \
114         }} while (0)
115
116 static uint32_t getpnn(struct ctdb_context *ctdb)
117 {
118         if ((options.pnn == CTDB_BROADCAST_ALL) ||
119             (options.pnn == CTDB_MULTICAST)) {
120                 DEBUG(DEBUG_ERR,
121                       ("Cannot get PNN for node %u\n", options.pnn));
122                 exit(1);
123         }
124
125         if (options.pnn == CTDB_CURRENT_NODE) {
126                 return ctdb_get_pnn(ctdb);
127         } else {
128                 return options.pnn;
129         }
130 }
131
132 static void assert_single_node_only(void)
133 {
134         if ((options.pnn == CTDB_BROADCAST_ALL) ||
135             (options.pnn == CTDB_MULTICAST)) {
136                 DEBUG(DEBUG_ERR,
137                       ("This control can not be applied to multiple PNNs\n"));
138                 exit(1);
139         }
140 }
141
142 static void assert_current_node_only(struct ctdb_context *ctdb)
143 {
144         if (options.pnn != ctdb_get_pnn(ctdb)) {
145                 DEBUG(DEBUG_ERR,
146                       ("This control can only be applied to the current node\n"));
147                 exit(1);
148         }
149 }
150
151 /* Pretty print the flags to a static buffer in human-readable format.
152  * This never returns NULL!
153  */
154 static const char *pretty_print_flags(uint32_t flags)
155 {
156         int j;
157         static const struct {
158                 uint32_t flag;
159                 const char *name;
160         } flag_names[] = {
161                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
162                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
163                 { NODE_FLAGS_BANNED,                "BANNED" },
164                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
165                 { NODE_FLAGS_DELETED,               "DELETED" },
166                 { NODE_FLAGS_STOPPED,               "STOPPED" },
167                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
168         };
169         static char flags_str[512]; /* Big enough to contain all flag names */
170
171         flags_str[0] = '\0';
172         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
173                 if (flags & flag_names[j].flag) {
174                         if (flags_str[0] == '\0') {
175                                 (void) strcpy(flags_str, flag_names[j].name);
176                         } else {
177                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
178                                 (void) strncat(flags_str, flag_names[j].name,
179                                                sizeof(flags_str)-1);
180                         }
181                 }
182         }
183         if (flags_str[0] == '\0') {
184                 (void) strcpy(flags_str, "OK");
185         }
186
187         return flags_str;
188 }
189
190 static int h2i(char h)
191 {
192         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
193         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
194         return h - '0';
195 }
196
197 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str, size_t len)
198 {
199         int i;
200         TDB_DATA key = {NULL, 0};
201
202         if (len & 0x01) {
203                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
204                 return key;
205         }
206
207         key.dsize = len>>1;
208         key.dptr  = talloc_size(mem_ctx, key.dsize);
209
210         for (i=0; i < len/2; i++) {
211                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
212         }
213         return key;
214 }
215
216 static TDB_DATA strtodata(TALLOC_CTX *mem_ctx, const char *str, size_t len)
217 {
218         TDB_DATA key;
219
220         if (!strncmp(str, "0x", 2)) {
221                 key = hextodata(mem_ctx, str + 2, len - 2);
222         } else {
223                 key.dptr  = talloc_memdup(mem_ctx, str, len);
224                 key.dsize = len;
225         }
226
227         return key;
228 }
229
230 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
231  * that are disconnected or deleted.  If dd_ok is true those nodes are
232  * included in the output list of nodes.  If dd_ok is false, those
233  * nodes are filtered from the "all" case and cause an error if
234  * explicitly specified.
235  */
236 static bool parse_nodestring(struct ctdb_context *ctdb,
237                              TALLOC_CTX *mem_ctx,
238                              const char * nodestring,
239                              uint32_t current_pnn,
240                              bool dd_ok,
241                              uint32_t **nodes,
242                              uint32_t *pnn_mode)
243 {
244         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
245         int n;
246         uint32_t i;
247         struct ctdb_node_map *nodemap;
248         int ret;
249
250         *nodes = NULL;
251
252         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
253         if (ret != 0) {
254                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
255                 talloc_free(tmp_ctx);
256                 exit(10);
257         }
258
259         if (nodestring != NULL) {
260                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
261                 if (*nodes == NULL) {
262                         goto failed;
263                 }
264
265                 n = 0;
266
267                 if (strcmp(nodestring, "all") == 0) {
268                         *pnn_mode = CTDB_BROADCAST_ALL;
269
270                         /* all */
271                         for (i = 0; i < nodemap->num; i++) {
272                                 if ((nodemap->nodes[i].flags &
273                                      (NODE_FLAGS_DISCONNECTED |
274                                       NODE_FLAGS_DELETED)) && !dd_ok) {
275                                         continue;
276                                 }
277                                 *nodes = talloc_realloc(mem_ctx, *nodes,
278                                                         uint32_t, n+1);
279                                 if (*nodes == NULL) {
280                                         goto failed;
281                                 }
282                                 (*nodes)[n] = i;
283                                 n++;
284                         }
285                 } else {
286                         /* x{,y...} */
287                         char *ns, *tok;
288
289                         ns = talloc_strdup(tmp_ctx, nodestring);
290                         tok = strtok(ns, ",");
291                         while (tok != NULL) {
292                                 uint32_t pnn;
293                                 char *endptr;
294                                 i = (uint32_t)strtoul(tok, &endptr, 0);
295                                 if (i == 0 && tok == endptr) {
296                                         DEBUG(DEBUG_ERR,
297                                               ("Invalid node %s\n", tok));
298                                         talloc_free(tmp_ctx);
299                                         exit(ERR_NONODE);
300                                 }
301                                 if (i >= nodemap->num) {
302                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
303                                         talloc_free(tmp_ctx);
304                                         exit(ERR_NONODE);
305                                 }
306                                 if ((nodemap->nodes[i].flags & 
307                                      (NODE_FLAGS_DISCONNECTED |
308                                       NODE_FLAGS_DELETED)) && !dd_ok) {
309                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
310                                         talloc_free(tmp_ctx);
311                                         exit(ERR_DISNODE);
312                                 }
313                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
314                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
315                                         talloc_free(tmp_ctx);
316                                         exit(10);
317                                 }
318
319                                 *nodes = talloc_realloc(mem_ctx, *nodes,
320                                                         uint32_t, n+1);
321                                 if (*nodes == NULL) {
322                                         goto failed;
323                                 }
324
325                                 (*nodes)[n] = i;
326                                 n++;
327
328                                 tok = strtok(NULL, ",");
329                         }
330                         talloc_free(ns);
331
332                         if (n == 1) {
333                                 *pnn_mode = (*nodes)[0];
334                         } else {
335                                 *pnn_mode = CTDB_MULTICAST;
336                         }
337                 }
338         } else {
339                 /* default - no nodes specified */
340                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
341                 if (*nodes == NULL) {
342                         goto failed;
343                 }
344                 *pnn_mode = CTDB_CURRENT_NODE;
345
346                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
347                         goto failed;
348                 }
349         }
350
351         talloc_free(tmp_ctx);
352         return true;
353
354 failed:
355         talloc_free(tmp_ctx);
356         return false;
357 }
358
359 /*
360  check if a database exists
361 */
362 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
363                       uint32_t *dbid, const char **dbname, uint8_t *flags)
364 {
365         int i, ret;
366         struct ctdb_dbid_map *dbmap=NULL;
367         bool dbid_given = false, found = false;
368         uint32_t id;
369         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
370         const char *name = NULL;
371
372         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
373         if (ret != 0) {
374                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
375                 goto fail;
376         }
377
378         if (strncmp(dbarg, "0x", 2) == 0) {
379                 id = strtoul(dbarg, NULL, 0);
380                 dbid_given = true;
381         }
382
383         for(i=0; i<dbmap->num; i++) {
384                 if (dbid_given) {
385                         if (id == dbmap->dbs[i].dbid) {
386                                 found = true;
387                                 break;
388                         }
389                 } else {
390                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
391                         if (ret != 0) {
392                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
393                                 goto fail;
394                         }
395
396                         if (strcmp(name, dbarg) == 0) {
397                                 id = dbmap->dbs[i].dbid;
398                                 found = true;
399                                 break;
400                         }
401                 }
402         }
403
404         if (found && dbid_given && dbname != NULL) {
405                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
406                 if (ret != 0) {
407                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
408                         found = false;
409                         goto fail;
410                 }
411         }
412
413         if (found) {
414                 if (dbid) *dbid = id;
415                 if (dbname) *dbname = talloc_strdup(ctdb, name);
416                 if (flags) *flags = dbmap->dbs[i].flags;
417         } else {
418                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
419         }
420
421 fail:
422         talloc_free(tmp_ctx);
423         return found;
424 }
425
426 /*
427   see if a process exists
428  */
429 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
430 {
431         uint32_t pnn, pid;
432         int ret;
433         if (argc < 1) {
434                 usage();
435         }
436
437         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
438                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
439                 return -1;
440         }
441
442         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
443         if (ret == 0) {
444                 printf("%u:%u exists\n", pnn, pid);
445         } else {
446                 printf("%u:%u does not exist\n", pnn, pid);
447         }
448         return ret;
449 }
450
451 /*
452   display statistics structure
453  */
454 static void show_statistics(struct ctdb_statistics *s, int show_header)
455 {
456         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
457         int i;
458         const char *prefix=NULL;
459         int preflen=0;
460         int tmp, days, hours, minutes, seconds;
461         const struct {
462                 const char *name;
463                 uint32_t offset;
464         } fields[] = {
465 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
466                 STATISTICS_FIELD(num_clients),
467                 STATISTICS_FIELD(frozen),
468                 STATISTICS_FIELD(recovering),
469                 STATISTICS_FIELD(num_recoveries),
470                 STATISTICS_FIELD(client_packets_sent),
471                 STATISTICS_FIELD(client_packets_recv),
472                 STATISTICS_FIELD(node_packets_sent),
473                 STATISTICS_FIELD(node_packets_recv),
474                 STATISTICS_FIELD(keepalive_packets_sent),
475                 STATISTICS_FIELD(keepalive_packets_recv),
476                 STATISTICS_FIELD(node.req_call),
477                 STATISTICS_FIELD(node.reply_call),
478                 STATISTICS_FIELD(node.req_dmaster),
479                 STATISTICS_FIELD(node.reply_dmaster),
480                 STATISTICS_FIELD(node.reply_error),
481                 STATISTICS_FIELD(node.req_message),
482                 STATISTICS_FIELD(node.req_control),
483                 STATISTICS_FIELD(node.reply_control),
484                 STATISTICS_FIELD(client.req_call),
485                 STATISTICS_FIELD(client.req_message),
486                 STATISTICS_FIELD(client.req_control),
487                 STATISTICS_FIELD(timeouts.call),
488                 STATISTICS_FIELD(timeouts.control),
489                 STATISTICS_FIELD(timeouts.traverse),
490                 STATISTICS_FIELD(locks.num_calls),
491                 STATISTICS_FIELD(locks.num_current),
492                 STATISTICS_FIELD(locks.num_pending),
493                 STATISTICS_FIELD(locks.num_failed),
494                 STATISTICS_FIELD(total_calls),
495                 STATISTICS_FIELD(pending_calls),
496                 STATISTICS_FIELD(childwrite_calls),
497                 STATISTICS_FIELD(pending_childwrite_calls),
498                 STATISTICS_FIELD(memory_used),
499                 STATISTICS_FIELD(max_hop_count),
500                 STATISTICS_FIELD(total_ro_delegations),
501                 STATISTICS_FIELD(total_ro_revokes),
502         };
503         
504         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
505         seconds = tmp%60;
506         tmp    /= 60;
507         minutes = tmp%60;
508         tmp    /= 60;
509         hours   = tmp%24;
510         tmp    /= 24;
511         days    = tmp;
512
513         if (options.machinereadable){
514                 if (show_header) {
515                         printm("CTDB version:");
516                         printm("Current time of statistics:");
517                         printm("Statistics collected since:");
518                         for (i=0;i<ARRAY_SIZE(fields);i++) {
519                                 printm("%s:", fields[i].name);
520                         }
521                         printm("num_reclock_ctdbd_latency:");
522                         printm("min_reclock_ctdbd_latency:");
523                         printm("avg_reclock_ctdbd_latency:");
524                         printm("max_reclock_ctdbd_latency:");
525
526                         printm("num_reclock_recd_latency:");
527                         printm("min_reclock_recd_latency:");
528                         printm("avg_reclock_recd_latency:");
529                         printm("max_reclock_recd_latency:");
530
531                         printm("num_call_latency:");
532                         printm("min_call_latency:");
533                         printm("avg_call_latency:");
534                         printm("max_call_latency:");
535
536                         printm("num_lockwait_latency:");
537                         printm("min_lockwait_latency:");
538                         printm("avg_lockwait_latency:");
539                         printm("max_lockwait_latency:");
540
541                         printm("num_childwrite_latency:");
542                         printm("min_childwrite_latency:");
543                         printm("avg_childwrite_latency:");
544                         printm("max_childwrite_latency:");
545                         printm("\n");
546                 }
547                 printm("%d:", CTDB_PROTOCOL);
548                 printm("%d:", (int)s->statistics_current_time.tv_sec);
549                 printm("%d:", (int)s->statistics_start_time.tv_sec);
550                 for (i=0;i<ARRAY_SIZE(fields);i++) {
551                         printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
552                 }
553                 printm("%d:", s->reclock.ctdbd.num);
554                 printm("%.6f:", s->reclock.ctdbd.min);
555                 printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
556                 printm("%.6f:", s->reclock.ctdbd.max);
557
558                 printm("%d:", s->reclock.recd.num);
559                 printm("%.6f:", s->reclock.recd.min);
560                 printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
561                 printm("%.6f:", s->reclock.recd.max);
562
563                 printm("%d:", s->call_latency.num);
564                 printm("%.6f:", s->call_latency.min);
565                 printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
566                 printm("%.6f:", s->call_latency.max);
567
568                 printm("%d:", s->childwrite_latency.num);
569                 printm("%.6f:", s->childwrite_latency.min);
570                 printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
571                 printm("%.6f:", s->childwrite_latency.max);
572                 printm("\n");
573         } else {
574                 printf("CTDB version %u\n", CTDB_PROTOCOL);
575                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
576                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
577
578                 for (i=0;i<ARRAY_SIZE(fields);i++) {
579                         if (strchr(fields[i].name, '.')) {
580                                 preflen = strcspn(fields[i].name, ".")+1;
581                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
582                                         prefix = fields[i].name;
583                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
584                                 }
585                         } else {
586                                 preflen = 0;
587                         }
588                         printf(" %*s%-22s%*s%10u\n", 
589                                preflen?4:0, "",
590                                fields[i].name+preflen, 
591                                preflen?0:4, "",
592                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
593                 }
594                 printf(" hop_count_buckets:");
595                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
596                         printf(" %d", s->hop_count_bucket[i]);
597                 }
598                 printf("\n");
599                 printf(" lock_buckets:");
600                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
601                         printf(" %d", s->locks.buckets[i]);
602                 }
603                 printf("\n");
604                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
605
606                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
607
608                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
609
610                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
611                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
612         }
613
614         talloc_free(tmp_ctx);
615 }
616
617 /*
618   display remote ctdb statistics combined from all nodes
619  */
620 static int control_statistics_all(struct ctdb_context *ctdb)
621 {
622         int ret, i;
623         struct ctdb_statistics statistics;
624         uint32_t *nodes;
625         uint32_t num_nodes;
626
627         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
628         CTDB_NO_MEMORY(ctdb, nodes);
629         
630         ZERO_STRUCT(statistics);
631
632         for (i=0;i<num_nodes;i++) {
633                 struct ctdb_statistics s1;
634                 int j;
635                 uint32_t *v1 = (uint32_t *)&s1;
636                 uint32_t *v2 = (uint32_t *)&statistics;
637                 uint32_t num_ints = 
638                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
639                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
640                 if (ret != 0) {
641                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
642                         return ret;
643                 }
644                 for (j=0;j<num_ints;j++) {
645                         v2[j] += v1[j];
646                 }
647                 statistics.max_hop_count = 
648                         MAX(statistics.max_hop_count, s1.max_hop_count);
649                 statistics.call_latency.max = 
650                         MAX(statistics.call_latency.max, s1.call_latency.max);
651         }
652         talloc_free(nodes);
653         printf("Gathered statistics for %u nodes\n", num_nodes);
654         show_statistics(&statistics, 1);
655         return 0;
656 }
657
658 /*
659   display remote ctdb statistics
660  */
661 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
662 {
663         int ret;
664         struct ctdb_statistics statistics;
665
666         if (options.pnn == CTDB_BROADCAST_ALL) {
667                 return control_statistics_all(ctdb);
668         }
669
670         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
673                 return ret;
674         }
675         show_statistics(&statistics, 1);
676         return 0;
677 }
678
679
680 /*
681   reset remote ctdb statistics
682  */
683 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
684 {
685         int ret;
686
687         ret = ctdb_statistics_reset(ctdb, options.pnn);
688         if (ret != 0) {
689                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
690                 return ret;
691         }
692         return 0;
693 }
694
695
696 /*
697   display remote ctdb rolling statistics
698  */
699 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
700 {
701         int ret;
702         struct ctdb_statistics_wire *stats;
703         int i, num_records = -1;
704
705         assert_single_node_only();
706
707         if (argc ==1) {
708                 num_records = atoi(argv[0]) - 1;
709         }
710
711         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
712         if (ret != 0) {
713                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
714                 return ret;
715         }
716         for (i=0;i<stats->num;i++) {
717                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
718                         continue;
719                 }
720                 show_statistics(&stats->stats[i], i==0);
721                 if (i == num_records) {
722                         break;
723                 }
724         }
725         return 0;
726 }
727
728
729 /*
730   display remote ctdb db statistics
731  */
732 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
733 {
734         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
735         struct ctdb_db_statistics *dbstat;
736         int i;
737         uint32_t db_id;
738         int num_hot_keys;
739         int ret;
740
741         if (argc < 1) {
742                 usage();
743         }
744
745         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
746                 return -1;
747         }
748
749         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
750         if (ret != 0) {
751                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
752                 talloc_free(tmp_ctx);
753                 return -1;
754         }
755
756         printf("DB Statistics: %s\n", argv[0]);
757         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
758                 dbstat->db_ro_delegations);
759         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
760                 dbstat->db_ro_delegations);
761         printf(" %s\n", "locks");
762         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
763                 dbstat->locks.num_calls);
764         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
765                 dbstat->locks.num_failed);
766         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
767                 dbstat->locks.num_current);
768         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
769                 dbstat->locks.num_pending);
770         printf(" %s", "hop_count_buckets:");
771         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
772                 printf(" %d", dbstat->hop_count_bucket[i]);
773         }
774         printf("\n");
775         printf(" %s", "lock_buckets:");
776         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
777                 printf(" %d", dbstat->locks.buckets[i]);
778         }
779         printf("\n");
780         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
781                 "locks_latency      MIN/AVG/MAX",
782                 dbstat->locks.latency.min,
783                 (dbstat->locks.latency.num ?
784                  dbstat->locks.latency.total /dbstat->locks.latency.num :
785                  0.0),
786                 dbstat->locks.latency.max,
787                 dbstat->locks.latency.num);
788         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
789                 "vacuum_latency     MIN/AVG/MAX",
790                 dbstat->vacuum.latency.min,
791                 (dbstat->vacuum.latency.num ?
792                  dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
793                  0.0),
794                 dbstat->vacuum.latency.max,
795                 dbstat->vacuum.latency.num);
796         num_hot_keys = 0;
797         for (i=0; i<dbstat->num_hot_keys; i++) {
798                 if (dbstat->hot_keys[i].count > 0) {
799                         num_hot_keys++;
800                 }
801         }
802         dbstat->num_hot_keys = num_hot_keys;
803
804         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
805         for (i = 0; i < dbstat->num_hot_keys; i++) {
806                 int j;
807                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
808                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
809                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
810                 }
811                 printf("\n");
812         }
813
814         talloc_free(tmp_ctx);
815         return 0;
816 }
817
818 /*
819   display uptime of remote node
820  */
821 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
822 {
823         int ret;
824         struct ctdb_uptime *uptime = NULL;
825         int tmp, days, hours, minutes, seconds;
826
827         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
828         if (ret != 0) {
829                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
830                 return ret;
831         }
832
833         if (options.machinereadable){
834                 printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
835                 printm(":%u:%u:%u:%lf\n",
836                         (unsigned int)uptime->current_time.tv_sec,
837                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
838                         (unsigned int)uptime->last_recovery_finished.tv_sec,
839                         timeval_delta(&uptime->last_recovery_finished,
840                                       &uptime->last_recovery_started)
841                 );
842                 return 0;
843         }
844
845         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
846
847         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
848         seconds = tmp%60;
849         tmp    /= 60;
850         minutes = tmp%60;
851         tmp    /= 60;
852         hours   = tmp%24;
853         tmp    /= 24;
854         days    = tmp;
855         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
856
857         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
858         seconds = tmp%60;
859         tmp    /= 60;
860         minutes = tmp%60;
861         tmp    /= 60;
862         hours   = tmp%24;
863         tmp    /= 24;
864         days    = tmp;
865         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
866         
867         printf("Duration of last recovery/failover: %lf seconds\n",
868                 timeval_delta(&uptime->last_recovery_finished,
869                               &uptime->last_recovery_started));
870
871         return 0;
872 }
873
874 /*
875   show the PNN of the current node
876  */
877 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
878 {
879         uint32_t mypnn;
880
881         mypnn = getpnn(ctdb);
882
883         printf("PNN:%d\n", mypnn);
884         return 0;
885 }
886
887
888 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx)
889 {
890         const char *nodes_list;
891
892         /* read the nodes file */
893         nodes_list = getenv("CTDB_NODES");
894         if (nodes_list == NULL) {
895                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
896                                              getenv("CTDB_BASE"));
897                 if (nodes_list == NULL) {
898                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
899                         exit(1);
900                 }
901         }
902
903         return ctdb_read_nodes_file(mem_ctx, nodes_list);
904 }
905
906 /*
907   show the PNN of the current node
908   discover the pnn by loading the nodes file and try to bind to all
909   addresses one at a time until the ip address is found.
910  */
911 static int find_node_xpnn(void)
912 {
913         TALLOC_CTX *mem_ctx = talloc_new(NULL);
914         struct ctdb_node_map *node_map;
915         int i, pnn;
916
917         node_map = read_nodes_file(mem_ctx);
918         if (node_map == NULL) {
919                 talloc_free(mem_ctx);
920                 return -1;
921         }
922
923         for (i = 0; i < node_map->num; i++) {
924                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
925                         continue;
926                 }
927                 if (ctdb_sys_have_ip(&node_map->nodes[i].addr)) {
928                         pnn = node_map->nodes[i].pnn;
929                         talloc_free(mem_ctx);
930                         return pnn;
931                 }
932         }
933
934         printf("Failed to detect which PNN this node is\n");
935         talloc_free(mem_ctx);
936         return -1;
937 }
938
939 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
940 {
941         uint32_t pnn;
942
943         assert_single_node_only();
944
945         pnn = find_node_xpnn();
946         if (pnn == -1) {
947                 return -1;
948         }
949
950         printf("PNN:%d\n", pnn);
951         return 0;
952 }
953
954 /* Helpers for ctdb status
955  */
956 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
957 {
958         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
959         int j;
960         bool ret = false;
961
962         if (node->flags == 0) {
963                 struct ctdb_control_get_ifaces *ifaces;
964
965                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
966                                          tmp_ctx, &ifaces) == 0) {
967                         for (j=0; j < ifaces->num; j++) {
968                                 if (ifaces->ifaces[j].link_state != 0) {
969                                         continue;
970                                 }
971                                 ret = true;
972                                 break;
973                         }
974                 }
975         }
976         talloc_free(tmp_ctx);
977
978         return ret;
979 }
980
981 static void control_status_header_machine(void)
982 {
983         printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
984                ":Inactive:PartiallyOnline:ThisNode:\n");
985 }
986
987 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
988                                     struct ctdb_node_and_flags *node)
989 {
990         printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
991                ctdb_addr_to_str(&node->addr),
992                !!(node->flags&NODE_FLAGS_DISCONNECTED),
993                !!(node->flags&NODE_FLAGS_BANNED),
994                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
995                !!(node->flags&NODE_FLAGS_UNHEALTHY),
996                !!(node->flags&NODE_FLAGS_STOPPED),
997                !!(node->flags&NODE_FLAGS_INACTIVE),
998                is_partially_online(ctdb, node) ? 1 : 0,
999                (node->pnn == mypnn)?'Y':'N');
1000
1001         return node->flags;
1002 }
1003
1004 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
1005                                   struct ctdb_node_and_flags *node)
1006 {
1007        printf("pnn:%d %-16s %s%s\n", node->pnn,
1008               ctdb_addr_to_str(&node->addr),
1009               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
1010               node->pnn == mypnn?" (THIS NODE)":"");
1011
1012        return node->flags;
1013 }
1014
1015 /*
1016   display remote ctdb status
1017  */
1018 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
1019 {
1020         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1021         int i;
1022         struct ctdb_vnn_map *vnnmap=NULL;
1023         struct ctdb_node_map *nodemap=NULL;
1024         uint32_t recmode, recmaster, mypnn;
1025         int num_deleted_nodes = 0;
1026         int ret;
1027
1028         mypnn = getpnn(ctdb);
1029
1030         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036
1037         if (options.machinereadable) {
1038                 control_status_header_machine();
1039                 for (i=0;i<nodemap->num;i++) {
1040                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1041                                 continue;
1042                         }
1043                         (void) control_status_1_machine(ctdb, mypnn,
1044                                                         &nodemap->nodes[i]);
1045                 }
1046                 talloc_free(tmp_ctx);
1047                 return 0;
1048         }
1049
1050         for (i=0; i<nodemap->num; i++) {
1051                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1052                         num_deleted_nodes++;
1053                 }
1054         }
1055         if (num_deleted_nodes == 0) {
1056                 printf("Number of nodes:%d\n", nodemap->num);
1057         } else {
1058                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1059                        nodemap->num, num_deleted_nodes);
1060         }
1061         for(i=0;i<nodemap->num;i++){
1062                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1063                         continue;
1064                 }
1065                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1066         }
1067
1068         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1069         if (ret != 0) {
1070                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1071                 talloc_free(tmp_ctx);
1072                 return -1;
1073         }
1074         if (vnnmap->generation == INVALID_GENERATION) {
1075                 printf("Generation:INVALID\n");
1076         } else {
1077                 printf("Generation:%d\n",vnnmap->generation);
1078         }
1079         printf("Size:%d\n",vnnmap->size);
1080         for(i=0;i<vnnmap->size;i++){
1081                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1082         }
1083
1084         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1085         if (ret != 0) {
1086                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1087                 talloc_free(tmp_ctx);
1088                 return -1;
1089         }
1090         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1091
1092         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1093         if (ret != 0) {
1094                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1095                 talloc_free(tmp_ctx);
1096                 return -1;
1097         }
1098         printf("Recovery master:%d\n",recmaster);
1099
1100         talloc_free(tmp_ctx);
1101         return 0;
1102 }
1103
1104 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1105 {
1106         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1107         int i, ret;
1108         struct ctdb_node_map *nodemap=NULL;
1109         uint32_t * nodes;
1110         uint32_t pnn_mode, mypnn;
1111
1112         if (argc > 1) {
1113                 usage();
1114         }
1115
1116         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1117                               options.pnn, true, &nodes, &pnn_mode)) {
1118                 return -1;
1119         }
1120
1121         if (options.machinereadable) {
1122                 control_status_header_machine();
1123         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1124                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1125         }
1126
1127         mypnn = getpnn(ctdb);
1128
1129         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1130         if (ret != 0) {
1131                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1132                 talloc_free(tmp_ctx);
1133                 return -1;
1134         }
1135
1136         ret = 0;
1137
1138         for (i = 0; i < talloc_array_length(nodes); i++) {
1139                 if (options.machinereadable) {
1140                         ret |= control_status_1_machine(ctdb, mypnn,
1141                                                         &nodemap->nodes[nodes[i]]);
1142                 } else {
1143                         ret |= control_status_1_human(ctdb, mypnn,
1144                                                       &nodemap->nodes[nodes[i]]);
1145                 }
1146         }
1147
1148         talloc_free(tmp_ctx);
1149         return ret;
1150 }
1151
1152 static struct ctdb_node_map *read_natgw_nodes_file(struct ctdb_context *ctdb,
1153                                                    TALLOC_CTX *mem_ctx)
1154 {
1155         const char *natgw_list;
1156         struct ctdb_node_map *natgw_nodes = NULL;
1157
1158         natgw_list = getenv("CTDB_NATGW_NODES");
1159         if (natgw_list == NULL) {
1160                 natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
1161                                              getenv("CTDB_BASE"));
1162                 if (natgw_list == NULL) {
1163                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1164                         exit(1);
1165                 }
1166         }
1167         /* The PNNs/flags will be junk but they're not used */
1168         natgw_nodes = ctdb_read_nodes_file(mem_ctx, natgw_list);
1169         if (natgw_nodes == NULL) {
1170                 DEBUG(DEBUG_ERR,
1171                       ("Failed to load natgw node list '%s'\n", natgw_list));
1172         }
1173         return natgw_nodes;
1174 }
1175
1176
1177 /* talloc off the existing nodemap... */
1178 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
1179 {
1180         return talloc_zero_size(nodemap,
1181                                 offsetof(struct ctdb_node_map, nodes) +
1182                                 nodemap->num * sizeof(struct ctdb_node_and_flags));
1183 }
1184
1185 static struct ctdb_node_map *
1186 filter_nodemap_by_addrs(struct ctdb_context *ctdb,
1187                         struct ctdb_node_map *nodemap,
1188                         struct ctdb_node_map *natgw_nodes)
1189 {
1190         int i, j;
1191         struct ctdb_node_map *ret;
1192
1193         ret = talloc_nodemap(nodemap);
1194         CTDB_NO_MEMORY_NULL(ctdb, ret);
1195
1196         ret->num = 0;
1197
1198         for (i = 0; i < nodemap->num; i++) {
1199                 for(j = 0; j < natgw_nodes->num ; j++) {
1200                         if (nodemap->nodes[j].flags & NODE_FLAGS_DELETED) {
1201                                 continue;
1202                         }
1203                         if (ctdb_same_ip(&natgw_nodes->nodes[j].addr,
1204                                          &nodemap->nodes[i].addr)) {
1205
1206                                 ret->nodes[ret->num] = nodemap->nodes[i];
1207                                 ret->num++;
1208                                 break;
1209                         }
1210                 }
1211         }
1212
1213         return ret;
1214 }
1215
1216 static struct ctdb_node_map *
1217 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
1218                                struct ctdb_node_map *nodemap,
1219                                uint32_t required_capabilities,
1220                                bool first_only)
1221 {
1222         int i;
1223         uint32_t capabilities;
1224         struct ctdb_node_map *ret;
1225
1226         ret = talloc_nodemap(nodemap);
1227         CTDB_NO_MEMORY_NULL(ctdb, ret);
1228
1229         ret->num = 0;
1230
1231         for (i = 0; i < nodemap->num; i++) {
1232                 int res;
1233
1234                 /* Disconnected nodes have no capabilities! */
1235                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1236                         continue;
1237                 }
1238
1239                 res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1240                                                 nodemap->nodes[i].pnn,
1241                                                 &capabilities);
1242                 if (res != 0) {
1243                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1244                                           nodemap->nodes[i].pnn));
1245                         talloc_free(ret);
1246                         return NULL;
1247                 }
1248                 if (!(capabilities & required_capabilities)) {
1249                         continue;
1250                 }
1251
1252                 ret->nodes[ret->num] = nodemap->nodes[i];
1253                 ret->num++;
1254                 if (first_only) {
1255                         break;
1256                 }
1257         }
1258
1259         return ret;
1260 }
1261
1262 static struct ctdb_node_map *
1263 filter_nodemap_by_flags(struct ctdb_context *ctdb,
1264                         struct ctdb_node_map *nodemap,
1265                         uint32_t flags_mask)
1266 {
1267         int i;
1268         struct ctdb_node_map *ret;
1269
1270         ret = talloc_nodemap(nodemap);
1271         CTDB_NO_MEMORY_NULL(ctdb, ret);
1272
1273         ret->num = 0;
1274
1275         for (i = 0; i < nodemap->num; i++) {
1276                 if (nodemap->nodes[i].flags & flags_mask) {
1277                         continue;
1278                 }
1279
1280                 ret->nodes[ret->num] = nodemap->nodes[i];
1281                 ret->num++;
1282         }
1283
1284         return ret;
1285 }
1286
1287 /*
1288   display the list of nodes belonging to this natgw configuration
1289  */
1290 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1291 {
1292         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1293         int i, ret;
1294         struct ctdb_node_map *natgw_nodes = NULL;
1295         struct ctdb_node_map *orig_nodemap=NULL;
1296         struct ctdb_node_map *nodemap;
1297         uint32_t mypnn, pnn;
1298         const char *ip;
1299
1300         /* When we have some nodes that could be the NATGW, make a
1301          * series of attempts to find the first node that doesn't have
1302          * certain status flags set.
1303          */
1304         uint32_t exclude_flags[] = {
1305                 /* Look for a nice healthy node */
1306                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1307                 /* If not found, an UNHEALTHY/BANNED node will do */
1308                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1309                 /* If not found, a STOPPED node will do */
1310                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1311                 0,
1312         };
1313
1314         /* read the natgw nodes file into a linked list */
1315         natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
1316         if (natgw_nodes == NULL) {
1317                 ret = -1;
1318                 goto done;
1319         }
1320
1321         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
1322                                    tmp_ctx, &orig_nodemap);
1323         if (ret != 0) {
1324                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1325                 talloc_free(tmp_ctx);
1326                 return -1;
1327         }
1328
1329         /* Get a nodemap that includes only the nodes in the NATGW
1330          * group */
1331         nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
1332         if (nodemap == NULL) {
1333                 ret = -1;
1334                 goto done;
1335         }
1336
1337         ret = 2; /* matches ENOENT */
1338         pnn = -1;
1339         ip = "0.0.0.0";
1340         /* For each flag mask... */
1341         for (i = 0; exclude_flags[i] != 0; i++) {
1342                 /* ... get a nodemap that excludes nodes with with
1343                  * masked flags... */
1344                 struct ctdb_node_map *t =
1345                         filter_nodemap_by_flags(ctdb, nodemap,
1346                                                 exclude_flags[i]);
1347                 if (t == NULL) {
1348                         /* No memory */
1349                         ret = -1;
1350                         goto done;
1351                 }
1352                 if (t->num > 0) {
1353                         /* ... and find the first node with the NATGW
1354                          * capability */
1355                         struct ctdb_node_map *n;
1356                         n = filter_nodemap_by_capabilities(ctdb, t,
1357                                                            CTDB_CAP_NATGW,
1358                                                            true);
1359                         if (n == NULL) {
1360                                 /* No memory */
1361                                 ret = -1;
1362                                 goto done;
1363                         }
1364                         if (n->num > 0) {
1365                                 ret = 0;
1366                                 pnn = n->nodes[0].pnn;
1367                                 ip = ctdb_addr_to_str(&n->nodes[0].addr);
1368                                 break;
1369                         }
1370                 }
1371                 talloc_free(t);
1372         }
1373
1374         if (options.machinereadable) {
1375                 printm(":Node:IP:\n");
1376                 printm(":%d:%s:\n", pnn, ip);
1377         } else {
1378                 printf("%d %s\n", pnn, ip);
1379         }
1380
1381         /* print the pruned list of nodes belonging to this natgw list */
1382         mypnn = getpnn(ctdb);
1383         if (options.machinereadable) {
1384                 control_status_header_machine();
1385         } else {
1386                 printf("Number of nodes:%d\n", nodemap->num);
1387         }
1388         for(i=0;i<nodemap->num;i++){
1389                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1390                         continue;
1391                 }
1392                 if (options.machinereadable) {
1393                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1394                 } else {
1395                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1396                 }
1397         }
1398
1399 done:
1400         talloc_free(tmp_ctx);
1401         return ret;
1402 }
1403
1404 /*
1405   display the status of the scripts for monitoring (or other events)
1406  */
1407 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1408                                     enum ctdb_eventscript_call type)
1409 {
1410         struct ctdb_scripts_wire *script_status;
1411         int ret, i;
1412
1413         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1416                 return ret;
1417         }
1418
1419         if (script_status == NULL) {
1420                 if (!options.machinereadable) {
1421                         printf("%s cycle never run\n",
1422                                ctdb_eventscript_call_names[type]);
1423                 }
1424                 return 0;
1425         }
1426
1427         if (!options.machinereadable) {
1428                 int num_run = 0;
1429                 for (i=0; i<script_status->num_scripts; i++) {
1430                         if (script_status->scripts[i].status != -ENOEXEC) {
1431                                 num_run++;
1432                         }
1433                 }
1434                 printf("%d scripts were executed last %s cycle\n",
1435                        num_run,
1436                        ctdb_eventscript_call_names[type]);
1437         }
1438         for (i=0; i<script_status->num_scripts; i++) {
1439                 const char *status = NULL;
1440
1441                 /* The ETIME status is ignored for certain events.
1442                  * In that case the status is 0, but endtime is not set.
1443                  */
1444                 if (script_status->scripts[i].status == 0 &&
1445                     timeval_is_zero(&script_status->scripts[i].finished)) {
1446                         script_status->scripts[i].status = -ETIME;
1447                 }
1448
1449                 switch (script_status->scripts[i].status) {
1450                 case -ETIME:
1451                         status = "TIMEDOUT";
1452                         break;
1453                 case -ENOEXEC:
1454                         status = "DISABLED";
1455                         break;
1456                 case 0:
1457                         status = "OK";
1458                         break;
1459                 default:
1460                         if (script_status->scripts[i].status > 0)
1461                                 status = "ERROR";
1462                         break;
1463                 }
1464                 if (options.machinereadable) {
1465                         printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1466                                ctdb_eventscript_call_names[type],
1467                                script_status->scripts[i].name,
1468                                script_status->scripts[i].status,
1469                                status,
1470                                (long)script_status->scripts[i].start.tv_sec,
1471                                (long)script_status->scripts[i].start.tv_usec,
1472                                (long)script_status->scripts[i].finished.tv_sec,
1473                                (long)script_status->scripts[i].finished.tv_usec,
1474                                script_status->scripts[i].output);
1475                         continue;
1476                 }
1477                 if (status)
1478                         printf("%-20s Status:%s    ",
1479                                script_status->scripts[i].name, status);
1480                 else
1481                         /* Some other error, eg from stat. */
1482                         printf("%-20s Status:CANNOT RUN (%s)",
1483                                script_status->scripts[i].name,
1484                                strerror(-script_status->scripts[i].status));
1485
1486                 if (script_status->scripts[i].status >= 0) {
1487                         printf("Duration:%.3lf ",
1488                         timeval_delta(&script_status->scripts[i].finished,
1489                               &script_status->scripts[i].start));
1490                 }
1491                 if (script_status->scripts[i].status != -ENOEXEC) {
1492                         printf("%s",
1493                                ctime(&script_status->scripts[i].start.tv_sec));
1494                         if (script_status->scripts[i].status != 0) {
1495                                 printf("   OUTPUT:%s\n",
1496                                        script_status->scripts[i].output);
1497                         }
1498                 } else {
1499                         printf("\n");
1500                 }
1501         }
1502         return 0;
1503 }
1504
1505
1506 static int control_scriptstatus(struct ctdb_context *ctdb,
1507                                 int argc, const char **argv)
1508 {
1509         int ret;
1510         enum ctdb_eventscript_call type, min, max;
1511         const char *arg;
1512
1513         if (argc > 1) {
1514                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1515                 return -1;
1516         }
1517
1518         if (argc == 0)
1519                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1520         else
1521                 arg = argv[0];
1522
1523         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1524                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1525                         min = type;
1526                         max = type+1;
1527                         break;
1528                 }
1529         }
1530         if (type == CTDB_EVENT_MAX) {
1531                 if (strcmp(arg, "all") == 0) {
1532                         min = 0;
1533                         max = CTDB_EVENT_MAX;
1534                 } else {
1535                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1536                         return -1;
1537                 }
1538         }
1539
1540         if (options.machinereadable) {
1541                 printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1542         }
1543
1544         for (type = min; type < max; type++) {
1545                 ret = control_one_scriptstatus(ctdb, type);
1546                 if (ret != 0) {
1547                         return ret;
1548                 }
1549         }
1550
1551         return 0;
1552 }
1553
1554 /*
1555   enable an eventscript
1556  */
1557 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1558 {
1559         int ret;
1560
1561         if (argc < 1) {
1562                 usage();
1563         }
1564
1565         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1566         if (ret != 0) {
1567           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1568                 return ret;
1569         }
1570
1571         return 0;
1572 }
1573
1574 /*
1575   disable an eventscript
1576  */
1577 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1578 {
1579         int ret;
1580
1581         if (argc < 1) {
1582                 usage();
1583         }
1584
1585         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1586         if (ret != 0) {
1587           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1588                 return ret;
1589         }
1590
1591         return 0;
1592 }
1593
1594 /*
1595   display the pnn of the recovery master
1596  */
1597 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1598 {
1599         uint32_t recmaster;
1600         int ret;
1601
1602         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1603         if (ret != 0) {
1604                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1605                 return -1;
1606         }
1607         printf("%d\n",recmaster);
1608
1609         return 0;
1610 }
1611
1612 /*
1613   add a tickle to a public address
1614  */
1615 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1616 {
1617         struct ctdb_tcp_connection t;
1618         TDB_DATA data;
1619         int ret;
1620
1621         assert_single_node_only();
1622
1623         if (argc < 2) {
1624                 usage();
1625         }
1626
1627         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1628                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1629                 return -1;
1630         }
1631         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1632                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1633                 return -1;
1634         }
1635
1636         data.dptr = (uint8_t *)&t;
1637         data.dsize = sizeof(t);
1638
1639         /* tell all nodes about this tcp connection */
1640         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1641                            0, data, ctdb, NULL, NULL, NULL, NULL);
1642         if (ret != 0) {
1643                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1644                 return -1;
1645         }
1646         
1647         return 0;
1648 }
1649
1650
1651 /*
1652   delete a tickle from a node
1653  */
1654 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1655 {
1656         struct ctdb_tcp_connection t;
1657         TDB_DATA data;
1658         int ret;
1659
1660         assert_single_node_only();
1661
1662         if (argc < 2) {
1663                 usage();
1664         }
1665
1666         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1667                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1668                 return -1;
1669         }
1670         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1671                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1672                 return -1;
1673         }
1674
1675         data.dptr = (uint8_t *)&t;
1676         data.dsize = sizeof(t);
1677
1678         /* tell all nodes about this tcp connection */
1679         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1680                            0, data, ctdb, NULL, NULL, NULL, NULL);
1681         if (ret != 0) {
1682                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1683                 return -1;
1684         }
1685         
1686         return 0;
1687 }
1688
1689
1690 /*
1691   get a list of all tickles for this pnn
1692  */
1693 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1694 {
1695         struct ctdb_control_tcp_tickle_list *list;
1696         ctdb_sock_addr addr;
1697         int i, ret;
1698         unsigned port = 0;
1699
1700         assert_single_node_only();
1701
1702         if (argc < 1) {
1703                 usage();
1704         }
1705
1706         if (argc == 2) {
1707                 port = atoi(argv[1]);
1708         }
1709
1710         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1711                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1712                 return -1;
1713         }
1714
1715         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1716         if (ret == -1) {
1717                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1718                 return -1;
1719         }
1720
1721         if (options.machinereadable){
1722                 printm(":source ip:port:destination ip:port:\n");
1723                 for (i=0;i<list->tickles.num;i++) {
1724                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1725                                 continue;
1726                         }
1727                         printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1728                         printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1729                 }
1730         } else {
1731                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1732                 printf("Num tickles:%u\n", list->tickles.num);
1733                 for (i=0;i<list->tickles.num;i++) {
1734                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1735                                 continue;
1736                         }
1737                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1738                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1739                 }
1740         }
1741
1742         talloc_free(list);
1743         
1744         return 0;
1745 }
1746
1747
1748 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1749 {
1750         struct ctdb_all_public_ips *ips;
1751         struct ctdb_public_ip ip;
1752         int i, ret;
1753         uint32_t *nodes;
1754         uint32_t disable_time;
1755         TDB_DATA data;
1756         struct ctdb_node_map *nodemap=NULL;
1757         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1758
1759         disable_time = 30;
1760         data.dptr  = (uint8_t*)&disable_time;
1761         data.dsize = sizeof(disable_time);
1762         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1765                 return -1;
1766         }
1767
1768
1769
1770         /* read the public ip list from the node */
1771         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1774                 talloc_free(tmp_ctx);
1775                 return -1;
1776         }
1777
1778         for (i=0;i<ips->num;i++) {
1779                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1780                         break;
1781                 }
1782         }
1783         if (i==ips->num) {
1784                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1785                         pnn, ctdb_addr_to_str(addr)));
1786                 talloc_free(tmp_ctx);
1787                 return -1;
1788         }
1789
1790         ip.pnn  = pnn;
1791         ip.addr = *addr;
1792
1793         data.dptr  = (uint8_t *)&ip;
1794         data.dsize = sizeof(ip);
1795
1796         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1799                 talloc_free(tmp_ctx);
1800                 return ret;
1801         }
1802
1803         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1804         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1805                                         nodes, 0,
1806                                         LONGTIMELIMIT(),
1807                                         false, data,
1808                                         NULL, NULL,
1809                                         NULL);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1812                 talloc_free(tmp_ctx);
1813                 return -1;
1814         }
1815
1816         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1817         if (ret != 0) {
1818                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1819                 talloc_free(tmp_ctx);
1820                 return -1;
1821         }
1822
1823         /* update the recovery daemon so it now knows to expect the new
1824            node assignment for this ip.
1825         */
1826         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1827         if (ret != 0) {
1828                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1829                 return -1;
1830         }
1831
1832         talloc_free(tmp_ctx);
1833         return 0;
1834 }
1835
1836
1837 /* 
1838  * scans all other nodes and returns a pnn for another node that can host this 
1839  * ip address or -1
1840  */
1841 static int
1842 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1843 {
1844         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1845         struct ctdb_all_public_ips *ips;
1846         struct ctdb_node_map *nodemap=NULL;
1847         int i, j, ret;
1848         int pnn;
1849
1850         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1851         if (ret != 0) {
1852                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1853                 talloc_free(tmp_ctx);
1854                 return ret;
1855         }
1856
1857         for(i=0;i<nodemap->num;i++){
1858                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1859                         continue;
1860                 }
1861                 if (nodemap->nodes[i].pnn == options.pnn) {
1862                         continue;
1863                 }
1864
1865                 /* read the public ip list from this node */
1866                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1867                 if (ret != 0) {
1868                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1869                         return -1;
1870                 }
1871
1872                 for (j=0;j<ips->num;j++) {
1873                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1874                                 pnn = nodemap->nodes[i].pnn;
1875                                 talloc_free(tmp_ctx);
1876                                 return pnn;
1877                         }
1878                 }
1879                 talloc_free(ips);
1880         }
1881
1882         talloc_free(tmp_ctx);
1883         return -1;
1884 }
1885
1886 /* If pnn is -1 then try to find a node to move IP to... */
1887 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1888 {
1889         bool pnn_specified = (pnn == -1 ? false : true);
1890         int retries = 0;
1891
1892         while (retries < 5) {
1893                 if (!pnn_specified) {
1894                         pnn = find_other_host_for_public_ip(ctdb, addr);
1895                         if (pnn == -1) {
1896                                 return false;
1897                         }
1898                         DEBUG(DEBUG_NOTICE,
1899                               ("Trying to move public IP to node %u\n", pnn));
1900                 }
1901
1902                 if (move_ip(ctdb, addr, pnn) == 0) {
1903                         return true;
1904                 }
1905
1906                 sleep(3);
1907                 retries++;
1908         }
1909
1910         return false;
1911 }
1912
1913
1914 /*
1915   move/failover an ip address to a specific node
1916  */
1917 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1918 {
1919         uint32_t pnn;
1920         ctdb_sock_addr addr;
1921
1922         assert_single_node_only();
1923
1924         if (argc < 2) {
1925                 usage();
1926                 return -1;
1927         }
1928
1929         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1930                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1931                 return -1;
1932         }
1933
1934
1935         if (sscanf(argv[1], "%u", &pnn) != 1) {
1936                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1937                 return -1;
1938         }
1939
1940         if (!try_moveip(ctdb, &addr, pnn)) {
1941                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1942                 return -1;
1943         }
1944
1945         return 0;
1946 }
1947
1948 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1949 {
1950         TDB_DATA data;
1951
1952         data.dptr  = (uint8_t *)&pnn;
1953         data.dsize = sizeof(uint32_t);
1954         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1955                 DEBUG(DEBUG_ERR,
1956                       ("Failed to send message to force node %u to be a rebalancing target\n",
1957                        pnn));
1958                 return -1;
1959         }
1960
1961         return 0;
1962 }
1963
1964
1965 /*
1966   rebalance a node by setting it to allow failback and triggering a
1967   takeover run
1968  */
1969 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1970 {
1971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1972         uint32_t *nodes;
1973         uint32_t pnn_mode;
1974         int i, ret;
1975
1976         assert_single_node_only();
1977
1978         if (argc > 1) {
1979                 usage();
1980         }
1981
1982         /* Determine the nodes where IPs need to be reloaded */
1983         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1984                               options.pnn, true, &nodes, &pnn_mode)) {
1985                 ret = -1;
1986                 goto done;
1987         }
1988
1989         for (i = 0; i < talloc_array_length(nodes); i++) {
1990                 if (!rebalance_node(ctdb, nodes[i])) {
1991                         ret = -1;
1992                 }
1993         }
1994
1995 done:
1996         talloc_free(tmp_ctx);
1997         return ret;
1998 }
1999
2000 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
2001 {
2002         struct ctdb_public_ip ip;
2003         int ret;
2004         uint32_t *nodes;
2005         uint32_t disable_time;
2006         TDB_DATA data;
2007         struct ctdb_node_map *nodemap=NULL;
2008         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2009
2010         disable_time = 30;
2011         data.dptr  = (uint8_t*)&disable_time;
2012         data.dsize = sizeof(disable_time);
2013         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
2016                 return -1;
2017         }
2018
2019         ip.pnn  = -1;
2020         ip.addr = *addr;
2021
2022         data.dptr  = (uint8_t *)&ip;
2023         data.dsize = sizeof(ip);
2024
2025         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
2026         if (ret != 0) {
2027                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2028                 talloc_free(tmp_ctx);
2029                 return ret;
2030         }
2031
2032         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
2033         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
2034                                         nodes, 0,
2035                                         LONGTIMELIMIT(),
2036                                         false, data,
2037                                         NULL, NULL,
2038                                         NULL);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
2041                 talloc_free(tmp_ctx);
2042                 return -1;
2043         }
2044
2045         talloc_free(tmp_ctx);
2046         return 0;
2047 }
2048
2049 /*
2050   release an ip form all nodes and have it re-assigned by recd
2051  */
2052 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
2053 {
2054         ctdb_sock_addr addr;
2055
2056         assert_single_node_only();
2057
2058         if (argc < 1) {
2059                 usage();
2060                 return -1;
2061         }
2062
2063         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2064                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2065                 return -1;
2066         }
2067
2068         if (rebalance_ip(ctdb, &addr) != 0) {
2069                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
2070                 return -1;
2071         }
2072
2073         return 0;
2074 }
2075
2076 static int getips_store_callback(void *param, void *data)
2077 {
2078         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
2079         struct ctdb_all_public_ips *ips = param;
2080         int i;
2081
2082         i = ips->num++;
2083         ips->ips[i].pnn  = node_ip->pnn;
2084         ips->ips[i].addr = node_ip->addr;
2085         return 0;
2086 }
2087
2088 static int getips_count_callback(void *param, void *data)
2089 {
2090         uint32_t *count = param;
2091
2092         (*count)++;
2093         return 0;
2094 }
2095
2096 #define IP_KEYLEN       4
2097 static uint32_t *ip_key(ctdb_sock_addr *ip)
2098 {
2099         static uint32_t key[IP_KEYLEN];
2100
2101         bzero(key, sizeof(key));
2102
2103         switch (ip->sa.sa_family) {
2104         case AF_INET:
2105                 key[0]  = ip->ip.sin_addr.s_addr;
2106                 break;
2107         case AF_INET6: {
2108                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
2109                 key[0]  = s6_a32[3];
2110                 key[1]  = s6_a32[2];
2111                 key[2]  = s6_a32[1];
2112                 key[3]  = s6_a32[0];
2113                 break;
2114         }
2115         default:
2116                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
2117                 return key;
2118         }
2119
2120         return key;
2121 }
2122
2123 static void *add_ip_callback(void *parm, void *data)
2124 {
2125         return parm;
2126 }
2127
2128 static int
2129 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2130 {
2131         struct ctdb_all_public_ips *tmp_ips;
2132         struct ctdb_node_map *nodemap=NULL;
2133         trbt_tree_t *ip_tree;
2134         int i, j, len, ret;
2135         uint32_t count;
2136
2137         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2140                 return ret;
2141         }
2142
2143         ip_tree = trbt_create(tmp_ctx, 0);
2144
2145         for(i=0;i<nodemap->num;i++){
2146                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2147                         continue;
2148                 }
2149                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2150                         continue;
2151                 }
2152
2153                 /* read the public ip list from this node */
2154                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2155                 if (ret != 0) {
2156                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2157                         return -1;
2158                 }
2159         
2160                 for (j=0; j<tmp_ips->num;j++) {
2161                         struct ctdb_public_ip *node_ip;
2162
2163                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2164                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2165                         node_ip->addr = tmp_ips->ips[j].addr;
2166
2167                         trbt_insertarray32_callback(ip_tree,
2168                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2169                                 add_ip_callback,
2170                                 node_ip);
2171                 }
2172                 talloc_free(tmp_ips);
2173         }
2174
2175         /* traverse */
2176         count = 0;
2177         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2178
2179         len = offsetof(struct ctdb_all_public_ips, ips) + 
2180                 count*sizeof(struct ctdb_public_ip);
2181         tmp_ips = talloc_zero_size(tmp_ctx, len);
2182         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2183
2184         *ips = tmp_ips;
2185
2186         return 0;
2187 }
2188
2189
2190 static void ctdb_every_second(struct tevent_context *ev,
2191                               struct tevent_timer *te,
2192                               struct timeval t, void *p)
2193 {
2194         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2195
2196         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(1, 0),
2197                          ctdb_every_second, ctdb);
2198 }
2199
2200 struct srvid_reply_handler_data {
2201         bool done;
2202         bool wait_for_all;
2203         uint32_t *nodes;
2204         const char *srvid_str;
2205 };
2206
2207 static void srvid_broadcast_reply_handler(uint64_t srvid, TDB_DATA data,
2208                                          void *private_data)
2209 {
2210         struct srvid_reply_handler_data *d =
2211                 (struct srvid_reply_handler_data *)private_data;
2212         int i;
2213         int32_t ret;
2214
2215         if (data.dsize != sizeof(ret)) {
2216                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2217                 return;
2218         }
2219
2220         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2221         ret = *(int32_t *)data.dptr;
2222         if (ret < 0) {
2223                 DEBUG(DEBUG_ERR,
2224                       ("%s failed with result %d\n", d->srvid_str, ret));
2225                 return;
2226         }
2227
2228         if (!d->wait_for_all) {
2229                 d->done = true;
2230                 return;
2231         }
2232
2233         /* Wait for all replies */
2234         d->done = true;
2235         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2236                 if (d->nodes[i] == ret) {
2237                         DEBUG(DEBUG_DEBUG,
2238                               ("%s reply received from node %u\n",
2239                                d->srvid_str, ret));
2240                         d->nodes[i] = -1;
2241                 }
2242                 if (d->nodes[i] != -1) {
2243                         /* Found a node that hasn't yet replied */
2244                         d->done = false;
2245                 }
2246         }
2247 }
2248
2249 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2250  * or replies from all connected nodes.  arg is the data argument to
2251  * pass in the srvid_request structure - pass 0 if this isn't needed.
2252  */
2253 static int srvid_broadcast(struct ctdb_context *ctdb,
2254                            uint64_t srvid, uint32_t *arg,
2255                            const char *srvid_str, bool wait_for_all)
2256 {
2257         int ret;
2258         TDB_DATA data;
2259         uint32_t pnn;
2260         uint64_t reply_srvid;
2261         struct srvid_request request;
2262         struct srvid_request_data request_data;
2263         struct srvid_reply_handler_data reply_data;
2264         struct timeval tv;
2265
2266         ZERO_STRUCT(request);
2267
2268         /* Time ticks to enable timeouts to be processed */
2269         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(1, 0),
2270                          ctdb_every_second, ctdb);
2271
2272         pnn = ctdb_get_pnn(ctdb);
2273         reply_srvid = getpid();
2274
2275         if (arg == NULL) {
2276                 request.pnn = pnn;
2277                 request.srvid = reply_srvid;
2278
2279                 data.dptr = (uint8_t *)&request;
2280                 data.dsize = sizeof(request);
2281         } else {
2282                 request_data.pnn = pnn;
2283                 request_data.srvid = reply_srvid;
2284                 request_data.data = *arg;
2285
2286                 data.dptr = (uint8_t *)&request_data;
2287                 data.dsize = sizeof(request_data);
2288         }
2289
2290         /* Register message port for reply from recovery master */
2291         ctdb_client_set_message_handler(ctdb, reply_srvid,
2292                                         srvid_broadcast_reply_handler,
2293                                         &reply_data);
2294
2295         reply_data.wait_for_all = wait_for_all;
2296         reply_data.nodes = NULL;
2297         reply_data.srvid_str = srvid_str;
2298
2299 again:
2300         reply_data.done = false;
2301
2302         if (wait_for_all) {
2303                 struct ctdb_node_map *nodemap;
2304
2305                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2306                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2307                 if (ret != 0) {
2308                         DEBUG(DEBUG_ERR,
2309                               ("Unable to get nodemap from current node, try again\n"));
2310                         sleep(1);
2311                         goto again;
2312                 }
2313
2314                 if (reply_data.nodes != NULL) {
2315                         talloc_free(reply_data.nodes);
2316                 }
2317                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2318                                                            NULL, true);
2319
2320                 talloc_free(nodemap);
2321         }
2322
2323         /* Send to all connected nodes. Only recmaster replies */
2324         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2325                                        srvid, data);
2326         if (ret != 0) {
2327                 /* This can only happen if the socket is closed and
2328                  * there's no way to recover from that, so don't try
2329                  * again.
2330                  */
2331                 DEBUG(DEBUG_ERR,
2332                       ("Failed to send %s request to connected nodes\n",
2333                        srvid_str));
2334                 return -1;
2335         }
2336
2337         tv = timeval_current();
2338         /* This loop terminates the reply is received */
2339         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2340                 tevent_loop_once(ctdb->ev);
2341         }
2342
2343         if (!reply_data.done) {
2344                 DEBUG(DEBUG_NOTICE,
2345                       ("Still waiting for confirmation of %s\n", srvid_str));
2346                 sleep(1);
2347                 goto again;
2348         }
2349
2350         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2351
2352         talloc_free(reply_data.nodes);
2353
2354         return 0;
2355 }
2356
2357 static int ipreallocate(struct ctdb_context *ctdb)
2358 {
2359         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2360                                "IP reallocation", false);
2361 }
2362
2363
2364 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2365 {
2366         return ipreallocate(ctdb);
2367 }
2368
2369 /*
2370   add a public ip address to a node
2371  */
2372 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2373 {
2374         int i, ret;
2375         int len, retries = 0;
2376         unsigned mask;
2377         ctdb_sock_addr addr;
2378         struct ctdb_control_ip_iface *pub;
2379         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2380         struct ctdb_all_public_ips *ips;
2381
2382
2383         if (argc != 2) {
2384                 talloc_free(tmp_ctx);
2385                 usage();
2386         }
2387
2388         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2389                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2390                 talloc_free(tmp_ctx);
2391                 return -1;
2392         }
2393
2394         /* read the public ip list from the node */
2395         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2396         if (ret != 0) {
2397                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2398                 talloc_free(tmp_ctx);
2399                 return -1;
2400         }
2401         for (i=0;i<ips->num;i++) {
2402                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2403                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2404                         return 0;
2405                 }
2406         }
2407
2408
2409
2410         /* Dont timeout. This command waits for an ip reallocation
2411            which sometimes can take wuite a while if there has
2412            been a recent recovery
2413         */
2414         alarm(0);
2415
2416         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2417         pub = talloc_size(tmp_ctx, len); 
2418         CTDB_NO_MEMORY(ctdb, pub);
2419
2420         pub->addr  = addr;
2421         pub->mask  = mask;
2422         pub->len   = strlen(argv[1])+1;
2423         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2424
2425         do {
2426                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2427                 if (ret != 0) {
2428                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2429                         sleep(3);
2430                         retries++;
2431                 }
2432         } while (retries < 5 && ret != 0);
2433         if (ret != 0) {
2434                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2435                 talloc_free(tmp_ctx);
2436                 return ret;
2437         }
2438
2439         if (rebalance_node(ctdb, options.pnn) != 0) {
2440                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2441                 return ret;
2442         }
2443
2444         talloc_free(tmp_ctx);
2445         return 0;
2446 }
2447
2448 /*
2449   add a public ip address to a node
2450  */
2451 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2452 {
2453         ctdb_sock_addr addr;
2454         char *iface = NULL;
2455
2456         if (argc != 1) {
2457                 usage();
2458         }
2459
2460         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2461                 printf("Badly formed ip : %s\n", argv[0]);
2462                 return -1;
2463         }
2464
2465         iface = ctdb_sys_find_ifname(&addr);
2466         if (iface == NULL) {
2467                 printf("Failed to get interface name for ip: %s", argv[0]);
2468                 return -1;
2469         }
2470
2471         printf("IP on interface %s\n", iface);
2472
2473         free(iface);
2474
2475         return 0;
2476 }
2477
2478 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2479
2480 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2481 {
2482         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2483         struct ctdb_node_map *nodemap=NULL;
2484         struct ctdb_all_public_ips *ips;
2485         int ret, i, j;
2486
2487         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2488         if (ret != 0) {
2489                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2490                 return ret;
2491         }
2492
2493         /* remove it from the nodes that are not hosting the ip currently */
2494         for(i=0;i<nodemap->num;i++){
2495                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2496                         continue;
2497                 }
2498                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2499                 if (ret != 0) {
2500                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2501                         continue;
2502                 }
2503
2504                 for (j=0;j<ips->num;j++) {
2505                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2506                                 break;
2507                         }
2508                 }
2509                 if (j==ips->num) {
2510                         continue;
2511                 }
2512
2513                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2514                         continue;
2515                 }
2516
2517                 options.pnn = nodemap->nodes[i].pnn;
2518                 control_delip(ctdb, argc, argv);
2519         }
2520
2521
2522         /* remove it from every node (also the one hosting it) */
2523         for(i=0;i<nodemap->num;i++){
2524                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2525                         continue;
2526                 }
2527                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2528                 if (ret != 0) {
2529                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2530                         continue;
2531                 }
2532
2533                 for (j=0;j<ips->num;j++) {
2534                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2535                                 break;
2536                         }
2537                 }
2538                 if (j==ips->num) {
2539                         continue;
2540                 }
2541
2542                 options.pnn = nodemap->nodes[i].pnn;
2543                 control_delip(ctdb, argc, argv);
2544         }
2545
2546         talloc_free(tmp_ctx);
2547         return 0;
2548 }
2549         
2550 /*
2551   delete a public ip address from a node
2552  */
2553 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2554 {
2555         int i, ret;
2556         ctdb_sock_addr addr;
2557         struct ctdb_control_ip_iface pub;
2558         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2559         struct ctdb_all_public_ips *ips;
2560
2561         if (argc != 1) {
2562                 talloc_free(tmp_ctx);
2563                 usage();
2564         }
2565
2566         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2567                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2568                 return -1;
2569         }
2570
2571         if (options.pnn == CTDB_BROADCAST_ALL) {
2572                 return control_delip_all(ctdb, argc, argv, &addr);
2573         }
2574
2575         pub.addr  = addr;
2576         pub.mask  = 0;
2577         pub.len   = 0;
2578
2579         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2580         if (ret != 0) {
2581                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2582                 talloc_free(tmp_ctx);
2583                 return ret;
2584         }
2585         
2586         for (i=0;i<ips->num;i++) {
2587                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2588                         break;
2589                 }
2590         }
2591
2592         if (i==ips->num) {
2593                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2594                         ctdb_addr_to_str(&addr)));
2595                 talloc_free(tmp_ctx);
2596                 return -1;
2597         }
2598
2599         /* This is an optimisation.  If this node is hosting the IP
2600          * then try to move it somewhere else without invoking a full
2601          * takeover run.  We don't care if this doesn't work!
2602          */
2603         if (ips->ips[i].pnn == options.pnn) {
2604                 (void) try_moveip(ctdb, &addr, -1);
2605         }
2606
2607         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2608         if (ret != 0) {
2609                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2610                 talloc_free(tmp_ctx);
2611                 return ret;
2612         }
2613
2614         talloc_free(tmp_ctx);
2615         return 0;
2616 }
2617
2618 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2619                               int argc, const char **argv)
2620 {
2621         struct ctdb_tcp_connection *killtcp;
2622         int max_entries, current, i;
2623         struct timeval timeout;
2624         char line[128], src[128], dst[128];
2625         int linenum;
2626         TDB_DATA data;
2627         struct client_async_data *async_data;
2628         struct ctdb_client_control_state *state;
2629
2630         if (argc != 0) {
2631                 usage();
2632         }
2633
2634         linenum = 1;
2635         killtcp = NULL;
2636         max_entries = 0;
2637         current = 0;
2638         while (!feof(stdin)) {
2639                 if (fgets(line, sizeof(line), stdin) == NULL) {
2640                         continue;
2641                 }
2642
2643                 /* Silently skip empty lines */
2644                 if (line[0] == '\n') {
2645                         continue;
2646                 }
2647
2648                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2649                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2650                                           linenum, line));
2651                         talloc_free(killtcp);
2652                         return -1;
2653                 }
2654
2655                 if (current >= max_entries) {
2656                         max_entries += 1024;
2657                         killtcp = talloc_realloc(ctdb, killtcp,
2658                                                  struct ctdb_tcp_connection,
2659                                                  max_entries);
2660                         CTDB_NO_MEMORY(ctdb, killtcp);
2661                 }
2662
2663                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2664                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2665                                           linenum, src));
2666                         talloc_free(killtcp);
2667                         return -1;
2668                 }
2669
2670                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2671                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2672                                           linenum, dst));
2673                         talloc_free(killtcp);
2674                         return -1;
2675                 }
2676
2677                 current++;
2678         }
2679
2680         async_data = talloc_zero(ctdb, struct client_async_data);
2681         if (async_data == NULL) {
2682                 talloc_free(killtcp);
2683                 return -1;
2684         }
2685
2686         for (i = 0; i < current; i++) {
2687
2688                 data.dsize = sizeof(struct ctdb_tcp_connection);
2689                 data.dptr  = (unsigned char *)&killtcp[i];
2690
2691                 timeout = TIMELIMIT();
2692                 state = ctdb_control_send(ctdb, options.pnn, 0,
2693                                           CTDB_CONTROL_KILL_TCP, 0, data,
2694                                           async_data, &timeout, NULL);
2695
2696                 if (state == NULL) {
2697                         DEBUG(DEBUG_ERR,
2698                               ("Failed to call async killtcp control to node %u\n",
2699                                options.pnn));
2700                         talloc_free(killtcp);
2701                         return -1;
2702                 }
2703                 
2704                 ctdb_client_async_add(async_data, state);
2705         }
2706
2707         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2708                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2709                 talloc_free(killtcp);
2710                 return -1;
2711         }
2712
2713         talloc_free(killtcp);
2714         return 0;
2715 }
2716
2717
2718 /*
2719   kill a tcp connection
2720  */
2721 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2722 {
2723         int ret;
2724         struct ctdb_tcp_connection killtcp;
2725
2726         assert_single_node_only();
2727
2728         if (argc == 0) {
2729                 return kill_tcp_from_file(ctdb, argc, argv);
2730         }
2731
2732         if (argc < 2) {
2733                 usage();
2734         }
2735
2736         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2737                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2738                 return -1;
2739         }
2740
2741         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2742                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2743                 return -1;
2744         }
2745
2746         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2747         if (ret != 0) {
2748                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2749                 return ret;
2750         }
2751
2752         return 0;
2753 }
2754
2755
2756 /*
2757   send a gratious arp
2758  */
2759 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2760 {
2761         int ret;
2762         ctdb_sock_addr addr;
2763
2764         assert_single_node_only();
2765
2766         if (argc < 2) {
2767                 usage();
2768         }
2769
2770         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2771                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2772                 return -1;
2773         }
2774
2775         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2776         if (ret != 0) {
2777                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2778                 return ret;
2779         }
2780
2781         return 0;
2782 }
2783
2784 /*
2785   register a server id
2786  */
2787 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2788 {
2789         int ret;
2790         struct ctdb_server_id server_id;
2791
2792         if (argc < 3) {
2793                 usage();
2794         }
2795
2796         server_id.pnn       = strtoul(argv[0], NULL, 0);
2797         server_id.type      = strtoul(argv[1], NULL, 0);
2798         server_id.server_id = strtoul(argv[2], NULL, 0);
2799
2800         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2801         if (ret != 0) {
2802                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2803                 return ret;
2804         }
2805         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2806         sleep(999);
2807         return -1;
2808 }
2809
2810 /*
2811   unregister a server id
2812  */
2813 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2814 {
2815         int ret;
2816         struct ctdb_server_id server_id;
2817
2818         if (argc < 3) {
2819                 usage();
2820         }
2821
2822         server_id.pnn       = strtoul(argv[0], NULL, 0);
2823         server_id.type      = strtoul(argv[1], NULL, 0);
2824         server_id.server_id = strtoul(argv[2], NULL, 0);
2825
2826         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2827         if (ret != 0) {
2828                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2829                 return ret;
2830         }
2831         return -1;
2832 }
2833
2834 /*
2835   check if a server id exists
2836  */
2837 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2838 {
2839         uint32_t status = 0;
2840         int ret;
2841         struct ctdb_server_id server_id;
2842
2843         if (argc < 3) {
2844                 usage();
2845         }
2846
2847         server_id.pnn       = strtoul(argv[0], NULL, 0);
2848         server_id.type      = strtoul(argv[1], NULL, 0);
2849         server_id.server_id = strtoul(argv[2], NULL, 0);
2850
2851         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2852         if (ret != 0) {
2853                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2854                 return ret;
2855         }
2856
2857         if (status) {
2858                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2859         } else {
2860                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2861         }
2862         return 0;
2863 }
2864
2865 /*
2866   get a list of all server ids that are registered on a node
2867  */
2868 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2869 {
2870         int i, ret;
2871         struct ctdb_server_id_list *server_ids;
2872
2873         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2874         if (ret != 0) {
2875                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2876                 return ret;
2877         }
2878
2879         for (i=0; i<server_ids->num; i++) {
2880                 printf("Server id %d:%d:%d\n", 
2881                         server_ids->server_ids[i].pnn, 
2882                         server_ids->server_ids[i].type, 
2883                         server_ids->server_ids[i].server_id); 
2884         }
2885
2886         return -1;
2887 }
2888
2889 /*
2890   check if a server id exists
2891  */
2892 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2893 {
2894         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2895         uint64_t *ids;
2896         uint8_t *result;
2897         int i;
2898
2899         if (argc < 1) {
2900                 talloc_free(tmp_ctx);
2901                 usage();
2902         }
2903
2904         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2905         result = talloc_array(tmp_ctx, uint8_t, argc);
2906
2907         for (i = 0; i < argc; i++) {
2908                 ids[i] = strtoull(argv[i], NULL, 0);
2909         }
2910
2911         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2912                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2913                                   options.pnn));
2914                 talloc_free(tmp_ctx);
2915                 return -1;
2916         }
2917
2918         for (i=0; i < argc; i++) {
2919                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2920                        result[i] ? "exists" : "does not exist");
2921         }
2922
2923         talloc_free(tmp_ctx);
2924         return 0;
2925 }
2926
2927 /*
2928   send a tcp tickle ack
2929  */
2930 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2931 {
2932         int ret;
2933         ctdb_sock_addr  src, dst;
2934
2935         if (argc < 2) {
2936                 usage();
2937         }
2938
2939         if (!parse_ip_port(argv[0], &src)) {
2940                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2941                 return -1;
2942         }
2943
2944         if (!parse_ip_port(argv[1], &dst)) {
2945                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2946                 return -1;
2947         }
2948
2949         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2950         if (ret==0) {
2951                 return 0;
2952         }
2953         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2954
2955         return -1;
2956 }
2957
2958
2959 /*
2960   display public ip status
2961  */
2962 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2963 {
2964         int i, ret;
2965         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2966         struct ctdb_all_public_ips *ips;
2967
2968         if (argc == 1 && strcmp(argv[0], "all") == 0) {
2969                 options.pnn = CTDB_BROADCAST_ALL;
2970         }
2971
2972         if (options.pnn == CTDB_BROADCAST_ALL) {
2973                 /* read the list of public ips from all nodes */
2974                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2975         } else {
2976                 /* read the public ip list from this node */
2977                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2978         }
2979         if (ret != 0) {
2980                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2981                 talloc_free(tmp_ctx);
2982                 return ret;
2983         }
2984
2985         if (options.machinereadable){
2986                 printm(":Public IP:Node:");
2987                 if (options.verbose){
2988                         printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2989                 }
2990                 printm("\n");
2991         } else {
2992                 if (options.pnn == CTDB_BROADCAST_ALL) {
2993                         printf("Public IPs on ALL nodes\n");
2994                 } else {
2995                         printf("Public IPs on node %u\n", options.pnn);
2996                 }
2997         }
2998
2999         for (i=1;i<=ips->num;i++) {
3000                 struct ctdb_control_public_ip_info *info = NULL;
3001                 int32_t pnn;
3002                 char *aciface = NULL;
3003                 char *avifaces = NULL;
3004                 char *cifaces = NULL;
3005
3006                 if (options.pnn == CTDB_BROADCAST_ALL) {
3007                         pnn = ips->ips[ips->num-i].pnn;
3008                 } else {
3009                         pnn = options.pnn;
3010                 }
3011
3012                 if (pnn != -1) {
3013                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
3014                                                    &ips->ips[ips->num-i].addr, &info);
3015                 } else {
3016                         ret = -1;
3017                 }
3018
3019                 if (ret == 0) {
3020                         int j;
3021                         for (j=0; j < info->num; j++) {
3022                                 if (cifaces == NULL) {
3023                                         cifaces = talloc_strdup(info,
3024                                                                 info->ifaces[j].name);
3025                                 } else {
3026                                         cifaces = talloc_asprintf_append(cifaces,
3027                                                                          ",%s",
3028                                                                          info->ifaces[j].name);
3029                                 }
3030
3031                                 if (info->active_idx == j) {
3032                                         aciface = info->ifaces[j].name;
3033                                 }
3034
3035                                 if (info->ifaces[j].link_state == 0) {
3036                                         continue;
3037                                 }
3038
3039                                 if (avifaces == NULL) {
3040                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
3041                                 } else {
3042                                         avifaces = talloc_asprintf_append(avifaces,
3043                                                                           ",%s",
3044                                                                           info->ifaces[j].name);
3045                                 }
3046                         }
3047                 }
3048
3049                 if (options.machinereadable){
3050                         printm(":%s:%d:",
3051                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3052                                 ips->ips[ips->num-i].pnn);
3053                         if (options.verbose){
3054                                 printm("%s:%s:%s:",
3055                                         aciface?aciface:"",
3056                                         avifaces?avifaces:"",
3057                                         cifaces?cifaces:"");
3058                         }
3059                         printf("\n");
3060                 } else {
3061                         if (options.verbose) {
3062                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
3063                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3064                                         ips->ips[ips->num-i].pnn,
3065                                         aciface?aciface:"",
3066                                         avifaces?avifaces:"",
3067                                         cifaces?cifaces:"");
3068                         } else {
3069                                 printf("%s %d\n",
3070                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3071                                         ips->ips[ips->num-i].pnn);
3072                         }
3073                 }
3074                 talloc_free(info);
3075         }
3076
3077         talloc_free(tmp_ctx);
3078         return 0;
3079 }
3080
3081 /*
3082   public ip info
3083  */
3084 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
3085 {
3086         int i, ret;
3087         ctdb_sock_addr addr;
3088         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3089         struct ctdb_control_public_ip_info *info;
3090
3091         if (argc != 1) {
3092                 talloc_free(tmp_ctx);
3093                 usage();
3094         }
3095
3096         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
3097                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
3098                 return -1;
3099         }
3100
3101         /* read the public ip info from this node */
3102         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
3103                                            tmp_ctx, &addr, &info);
3104         if (ret != 0) {
3105                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
3106                                   argv[0], options.pnn));
3107                 talloc_free(tmp_ctx);
3108                 return ret;
3109         }
3110
3111         printf("Public IP[%s] info on node %u\n",
3112                ctdb_addr_to_str(&info->ip.addr),
3113                options.pnn);
3114
3115         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
3116                ctdb_addr_to_str(&info->ip.addr),
3117                info->ip.pnn, info->num);
3118
3119         for (i=0; i<info->num; i++) {
3120                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
3121
3122                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
3123                        i+1, info->ifaces[i].name,
3124                        info->ifaces[i].link_state?"up":"down",
3125                        (unsigned int)info->ifaces[i].references,
3126                        (i==info->active_idx)?" (active)":"");
3127         }
3128
3129         talloc_free(tmp_ctx);
3130         return 0;
3131 }
3132
3133 /*
3134   display interfaces status
3135  */
3136 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3137 {
3138         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3139         int i;
3140         struct ctdb_control_get_ifaces *ifaces;
3141         int ret;
3142
3143         /* read the public ip list from this node */
3144         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3145         if (ret != 0) {
3146                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3147                                   options.pnn));
3148                 talloc_free(tmp_ctx);
3149                 return -1;
3150         }
3151
3152         if (options.machinereadable){
3153                 printm(":Name:LinkStatus:References:\n");
3154         } else {
3155                 printf("Interfaces on node %u\n", options.pnn);
3156         }
3157
3158         for (i=0; i<ifaces->num; i++) {
3159                 if (options.machinereadable){
3160                         printm(":%s:%s:%u:\n",
3161                                ifaces->ifaces[i].name,
3162                                ifaces->ifaces[i].link_state?"1":"0",
3163                                (unsigned int)ifaces->ifaces[i].references);
3164                 } else {
3165                         printf("name:%s link:%s references:%u\n",
3166                                ifaces->ifaces[i].name,
3167                                ifaces->ifaces[i].link_state?"up":"down",
3168                                (unsigned int)ifaces->ifaces[i].references);
3169                 }
3170         }
3171
3172         talloc_free(tmp_ctx);
3173         return 0;
3174 }
3175
3176
3177 /*
3178   set link status of an interface
3179  */
3180 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3181 {
3182         int ret;
3183         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3184         struct ctdb_control_iface_info info;
3185
3186         ZERO_STRUCT(info);
3187
3188         if (argc != 2) {
3189                 usage();
3190         }
3191
3192         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3193                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3194                                   argv[0]));
3195                 talloc_free(tmp_ctx);
3196                 return -1;
3197         }
3198         strcpy(info.name, argv[0]);
3199
3200         if (strcmp(argv[1], "up") == 0) {
3201                 info.link_state = 1;
3202         } else if (strcmp(argv[1], "down") == 0) {
3203                 info.link_state = 0;
3204         } else {
3205                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3206                                   argv[1]));
3207                 talloc_free(tmp_ctx);
3208                 return -1;
3209         }
3210
3211         /* read the public ip list from this node */
3212         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3213                                    tmp_ctx, &info);
3214         if (ret != 0) {
3215                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3216                                   argv[0], options.pnn));
3217                 talloc_free(tmp_ctx);
3218                 return ret;
3219         }
3220
3221         talloc_free(tmp_ctx);
3222         return 0;
3223 }
3224
3225 /*
3226   display pid of a ctdb daemon
3227  */
3228 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3229 {
3230         uint32_t pid;
3231         int ret;
3232
3233         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3234         if (ret != 0) {
3235                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3236                 return ret;
3237         }
3238         printf("Pid:%d\n", pid);
3239
3240         return 0;
3241 }
3242
3243 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3244
3245 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3246                                               void *data,
3247                                               update_flags_handler_t handler,
3248                                               uint32_t flag,
3249                                               const char *desc,
3250                                               bool set_flag)
3251 {
3252         struct ctdb_node_map *nodemap = NULL;
3253         bool flag_is_set;
3254         int ret;
3255
3256         /* Check if the node is already in the desired state */
3257         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3258         if (ret != 0) {
3259                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3260                 exit(10);
3261         }
3262         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3263         if (set_flag == flag_is_set) {
3264                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3265                                      (set_flag ? "already" : "not"), desc));
3266                 return 0;
3267         }
3268
3269         do {
3270                 if (!handler(ctdb, data)) {
3271                         DEBUG(DEBUG_WARNING,
3272                               ("Failed to send control to set state %s on node %u, try again\n",
3273                                desc, options.pnn));
3274                 }
3275
3276                 sleep(1);
3277
3278                 /* Read the nodemap and verify the change took effect.
3279                  * Even if the above control/hanlder timed out then it
3280                  * could still have worked!
3281                  */
3282                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3283                                          ctdb, &nodemap);
3284                 if (ret != 0) {
3285                         DEBUG(DEBUG_WARNING,
3286                               ("Unable to get nodemap from local node, try again\n"));
3287                 }
3288                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3289         } while (nodemap == NULL || (set_flag != flag_is_set));
3290
3291         return ipreallocate(ctdb);
3292 }
3293
3294 /* Administratively disable a node */
3295 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3296 {
3297         int ret;
3298
3299         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3300                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3301         return ret == 0;
3302 }
3303
3304 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3305 {
3306         return update_flags_and_ipreallocate(ctdb, NULL,
3307                                                   update_flags_disabled,
3308                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3309                                                   "disabled",
3310                                                   true /* set_flag*/);
3311 }
3312
3313 /* Administratively re-enable a node */
3314 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3315 {
3316         int ret;
3317
3318         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3319                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3320         return ret == 0;
3321 }
3322
3323 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3324 {
3325         return update_flags_and_ipreallocate(ctdb, NULL,
3326                                                   update_flags_not_disabled,
3327                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3328                                                   "disabled",
3329                                                   false /* set_flag*/);
3330 }
3331
3332 /* Stop a node */
3333 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3334 {
3335         int ret;
3336
3337         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3338
3339         return ret == 0;
3340 }
3341
3342 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3343 {
3344         return update_flags_and_ipreallocate(ctdb, NULL,
3345                                                   update_flags_stopped,
3346                                                   NODE_FLAGS_STOPPED,
3347                                                   "stopped",
3348                                                   true /* set_flag*/);
3349 }
3350
3351 /* Continue a stopped node */
3352 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3353 {
3354         int ret;
3355
3356         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3357
3358         return ret == 0;
3359 }
3360
3361 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3362 {
3363         return update_flags_and_ipreallocate(ctdb, NULL,
3364                                                   update_flags_not_stopped,
3365                                                   NODE_FLAGS_STOPPED,
3366                                                   "stopped",
3367                                                   false /* set_flag */);
3368 }
3369
3370 static uint32_t get_generation(struct ctdb_context *ctdb)
3371 {
3372         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3373         struct ctdb_vnn_map *vnnmap=NULL;
3374         int ret;
3375         uint32_t generation;
3376
3377         /* wait until the recmaster is not in recovery mode */
3378         while (1) {
3379                 uint32_t recmode, recmaster;
3380                 
3381                 if (vnnmap != NULL) {
3382                         talloc_free(vnnmap);
3383                         vnnmap = NULL;
3384                 }
3385
3386                 /* get the recmaster */
3387                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3388                 if (ret != 0) {
3389                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3390                         talloc_free(tmp_ctx);
3391                         exit(10);
3392                 }
3393
3394                 /* get recovery mode */
3395                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3396                 if (ret != 0) {
3397                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3398                         talloc_free(tmp_ctx);
3399                         exit(10);
3400                 }
3401
3402                 /* get the current generation number */
3403                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3404                 if (ret != 0) {
3405                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3406                         talloc_free(tmp_ctx);
3407                         exit(10);
3408                 }
3409
3410                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3411                         generation = vnnmap->generation;
3412                         talloc_free(tmp_ctx);
3413                         return generation;
3414                 }
3415                 sleep(1);
3416         }
3417 }
3418
3419 /* Ban a node */
3420 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3421 {
3422         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3423         int ret;
3424
3425         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3426
3427         return ret == 0;
3428 }
3429
3430 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3431 {
3432         struct ctdb_ban_time bantime;
3433
3434         if (argc < 1) {
3435                 usage();
3436         }
3437         
3438         bantime.pnn  = options.pnn;
3439         bantime.time = strtoul(argv[0], NULL, 0);
3440
3441         if (bantime.time == 0) {
3442                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3443                 return -1;
3444         }
3445
3446         return update_flags_and_ipreallocate(ctdb, &bantime,
3447                                                   update_state_banned,
3448                                                   NODE_FLAGS_BANNED,
3449                                                   "banned",
3450                                                   true /* set_flag*/);
3451 }
3452
3453
3454 /* Unban a node */
3455 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3456 {
3457         struct ctdb_ban_time bantime;
3458
3459         bantime.pnn  = options.pnn;
3460         bantime.time = 0;
3461
3462         return update_flags_and_ipreallocate(ctdb, &bantime,
3463                                                   update_state_banned,
3464                                                   NODE_FLAGS_BANNED,
3465                                                   "banned",
3466                                                   false /* set_flag*/);
3467 }
3468
3469 /*
3470   show ban information for a node
3471  */
3472 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3473 {
3474         int ret;
3475         struct ctdb_node_map *nodemap=NULL;
3476         struct ctdb_ban_time *bantime;
3477
3478         /* verify the node exists */
3479         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3480         if (ret != 0) {
3481                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3482                 return ret;
3483         }
3484
3485         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3486         if (ret != 0) {
3487                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3488                 return -1;
3489         }       
3490
3491         if (bantime->time == 0) {
3492                 printf("Node %u is not banned\n", bantime->pnn);
3493         } else {
3494                 printf("Node %u is banned, %d seconds remaining\n",
3495                        bantime->pnn, bantime->time);
3496         }
3497
3498         return 0;
3499 }
3500
3501 /*
3502   shutdown a daemon
3503  */
3504 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3505 {
3506         int ret;
3507
3508         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3509         if (ret != 0) {
3510                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3511                 return ret;
3512         }
3513
3514         return 0;
3515 }
3516
3517 /*
3518   trigger a recovery
3519  */
3520 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3521 {
3522         int ret;
3523         uint32_t generation, next_generation;
3524
3525         /* record the current generation number */
3526         generation = get_generation(ctdb);
3527
3528         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3529         if (ret != 0) {
3530                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3531                 return ret;
3532         }
3533
3534         /* wait until we are in a new generation */
3535         while (1) {
3536                 next_generation = get_generation(ctdb);
3537                 if (next_generation != generation) {
3538                         return 0;
3539                 }
3540                 sleep(1);
3541         }
3542
3543         return 0;
3544 }
3545
3546
3547 /*
3548   display monitoring mode of a remote node
3549  */
3550 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3551 {
3552         uint32_t monmode;
3553         int ret;
3554
3555         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3556         if (ret != 0) {
3557                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3558                 return ret;
3559         }
3560         if (!options.machinereadable){
3561                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3562         } else {
3563                 printm(":mode:\n");
3564                 printm(":%d:\n",monmode);
3565         }
3566         return 0;
3567 }
3568
3569
3570 /*
3571   display capabilities of a remote node
3572  */
3573 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3574 {
3575         uint32_t capabilities;
3576         int ret;
3577
3578         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3579         if (ret != 0) {
3580                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3581                 return -1;
3582         }
3583         
3584         if (!options.machinereadable){
3585                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3586                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3587                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3588                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3589         } else {
3590                 printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
3591                 printm(":%d:%d:%d:%d:\n",
3592                         !!(capabilities&CTDB_CAP_RECMASTER),
3593                         !!(capabilities&CTDB_CAP_LMASTER),
3594                         !!(capabilities&CTDB_CAP_LVS),
3595                         !!(capabilities&CTDB_CAP_NATGW));
3596         }
3597         return 0;
3598 }
3599
3600 /*
3601   display lvs configuration
3602  */
3603
3604 static uint32_t lvs_exclude_flags[] = {
3605         /* Look for a nice healthy node */
3606         NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
3607         /* If not found, an UNHEALTHY node will do */
3608         NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
3609         0,
3610 };
3611
3612 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3613 {
3614         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3615         struct ctdb_node_map *orig_nodemap=NULL;
3616         struct ctdb_node_map *nodemap;
3617         int i, ret;
3618
3619         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3620                                    tmp_ctx, &orig_nodemap);
3621         if (ret != 0) {
3622                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3623                 talloc_free(tmp_ctx);
3624                 return -1;
3625         }
3626
3627         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3628                                                  CTDB_CAP_LVS, false);
3629         if (nodemap == NULL) {
3630                 /* No memory */
3631                 ret = -1;
3632                 goto done;
3633         }
3634
3635         ret = 0;
3636
3637         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3638                 struct ctdb_node_map *t =
3639                         filter_nodemap_by_flags(ctdb, nodemap,
3640                                                 lvs_exclude_flags[i]);
3641                 if (t == NULL) {
3642                         /* No memory */
3643                         ret = -1;
3644                         goto done;
3645                 }
3646                 if (t->num > 0) {
3647                         /* At least 1 node without excluded flags */
3648                         int j;
3649                         for (j = 0; j < t->num; j++) {
3650                                 printf("%d:%s\n", t->nodes[j].pnn, 
3651                                        ctdb_addr_to_str(&t->nodes[j].addr));
3652                         }
3653                         goto done;
3654                 }
3655                 talloc_free(t);
3656         }
3657 done:
3658         talloc_free(tmp_ctx);
3659         return ret;
3660 }
3661
3662 /*
3663   display who is the lvs master
3664  */
3665 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3666 {
3667         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3668         struct ctdb_node_map *nodemap=NULL;
3669         int i, ret;
3670
3671         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3672                                    tmp_ctx, &nodemap);
3673         if (ret != 0) {
3674                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3675                 talloc_free(tmp_ctx);
3676                 return -1;
3677         }
3678
3679         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3680                 struct ctdb_node_map *t =
3681                         filter_nodemap_by_flags(ctdb, nodemap,
3682                                                 lvs_exclude_flags[i]);
3683                 if (t == NULL) {
3684                         /* No memory */
3685                         ret = -1;
3686                         goto done;
3687                 }
3688                 if (t->num > 0) {
3689                         struct ctdb_node_map *n;
3690                         n = filter_nodemap_by_capabilities(ctdb,
3691                                                            t,
3692                                                            CTDB_CAP_LVS,
3693                                                            true);
3694                         if (n == NULL) {
3695                                 /* No memory */
3696                                 ret = -1;
3697                                 goto done;
3698                         }
3699                         if (n->num > 0) {
3700                                 ret = 0;
3701                                 if (options.machinereadable) {
3702                                         printm("%d\n", n->nodes[0].pnn);
3703                                 } else {
3704                                         printf("Node %d is LVS master\n", n->nodes[0].pnn);
3705                                 }
3706                                 goto done;
3707                         }
3708                 }
3709                 talloc_free(t);
3710         }
3711
3712         printf("There is no LVS master\n");
3713         ret = 255;
3714 done:
3715         talloc_free(tmp_ctx);
3716         return ret;
3717 }
3718
3719 /*
3720   disable monitoring on a  node
3721  */
3722 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3723 {
3724         
3725         int ret;
3726
3727         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3728         if (ret != 0) {
3729                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3730                 return ret;
3731         }
3732         printf("Monitoring mode:%s\n","DISABLED");
3733
3734         return 0;
3735 }
3736
3737 /*
3738   enable monitoring on a  node
3739  */
3740 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3741 {
3742         
3743         int ret;
3744
3745         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3746         if (ret != 0) {
3747                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3748                 return ret;
3749         }
3750         printf("Monitoring mode:%s\n","ACTIVE");
3751
3752         return 0;
3753 }
3754
3755 /*
3756   display remote list of keys/data for a db
3757  */
3758 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3759 {
3760         const char *db_name;
3761         struct ctdb_db_context *ctdb_db;
3762         int ret;
3763         struct ctdb_dump_db_context c;
3764         uint8_t flags;
3765
3766         if (argc < 1) {
3767                 usage();
3768         }
3769
3770         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3771                 return -1;
3772         }
3773
3774         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3775         if (ctdb_db == NULL) {
3776                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3777                 return -1;
3778         }
3779
3780         if (options.printlmaster) {
3781                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3782                                           ctdb, &ctdb->vnn_map);
3783                 if (ret != 0) {
3784                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3785                                           options.pnn));
3786                         return ret;
3787                 }
3788         }
3789
3790         ZERO_STRUCT(c);
3791         c.ctdb = ctdb;
3792         c.f = stdout;
3793         c.printemptyrecords = (bool)options.printemptyrecords;
3794         c.printdatasize = (bool)options.printdatasize;
3795         c.printlmaster = (bool)options.printlmaster;
3796         c.printhash = (bool)options.printhash;
3797         c.printrecordflags = (bool)options.printrecordflags;
3798
3799         /* traverse and dump the cluster tdb */
3800         ret = ctdb_dump_db(ctdb_db, &c);
3801         if (ret == -1) {
3802                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3803                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3804                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3805                                   db_name));
3806                 return -1;
3807         }
3808         talloc_free(ctdb_db);
3809
3810         printf("Dumped %d records\n", ret);
3811         return 0;
3812 }
3813
3814 struct cattdb_data {
3815         struct ctdb_context *ctdb;
3816         uint32_t count;
3817 };
3818
3819 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3820 {
3821         struct cattdb_data *d = private_data;
3822         struct ctdb_dump_db_context c;
3823
3824         d->count++;
3825
3826         ZERO_STRUCT(c);
3827         c.ctdb = d->ctdb;
3828         c.f = stdout;
3829         c.printemptyrecords = (bool)options.printemptyrecords;
3830         c.printdatasize = (bool)options.printdatasize;
3831         c.printlmaster = false;
3832         c.printhash = (bool)options.printhash;
3833         c.printrecordflags = true;
3834
3835         return ctdb_dumpdb_record(key, data, &c);
3836 }
3837
3838 /*
3839   cat the local tdb database using same format as catdb
3840  */
3841 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3842 {
3843         const char *db_name;
3844         struct ctdb_db_context *ctdb_db;
3845         struct cattdb_data d;
3846         uint8_t flags;
3847
3848         if (argc < 1) {
3849                 usage();
3850         }
3851
3852         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3853                 return -1;
3854         }
3855
3856         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3857         if (ctdb_db == NULL) {
3858                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3859                 return -1;
3860         }
3861
3862         /* traverse the local tdb */
3863         d.count = 0;
3864         d.ctdb  = ctdb;
3865         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3866                 printf("Failed to cattdb data\n");
3867                 exit(10);
3868         }
3869         talloc_free(ctdb_db);
3870
3871         printf("Dumped %d records\n", d.count);
3872         return 0;
3873 }
3874
3875 /*
3876   display the content of a database key
3877  */
3878 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3879 {
3880         const char *db_name;
3881         struct ctdb_db_context *ctdb_db;
3882         struct ctdb_record_handle *h;
3883         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3884         TDB_DATA key, data;
3885         uint8_t flags;
3886
3887         if (argc < 2) {
3888                 usage();
3889         }
3890
3891         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3892                 return -1;
3893         }
3894
3895         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3896         if (ctdb_db == NULL) {
3897                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3898                 return -1;
3899         }
3900
3901         key.dptr  = discard_const(argv[1]);
3902         key.dsize = strlen((char *)key.dptr);
3903
3904         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3905         if (h == NULL) {
3906                 printf("Failed to fetch record '%s' on node %d\n", 
3907                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3908                 talloc_free(tmp_ctx);
3909                 exit(10);
3910         }
3911
3912         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3913
3914         talloc_free(tmp_ctx);
3915         talloc_free(ctdb_db);
3916         return 0;
3917 }
3918
3919 /*
3920   display the content of a database key
3921  */
3922 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3923 {
3924         const char *db_name;
3925         struct ctdb_db_context *ctdb_db;
3926         struct ctdb_record_handle *h;
3927         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3928         TDB_DATA key, data;
3929         uint8_t flags;
3930
3931         if (argc < 3) {
3932                 usage();
3933         }
3934
3935         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3936                 return -1;
3937         }
3938
3939         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3940         if (ctdb_db == NULL) {
3941                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3942                 return -1;
3943         }
3944
3945         key.dptr  = discard_const(argv[1]);
3946         key.dsize = strlen((char *)key.dptr);
3947
3948         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3949         if (h == NULL) {
3950                 printf("Failed to fetch record '%s' on node %d\n", 
3951                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3952                 talloc_free(tmp_ctx);
3953                 exit(10);
3954         }
3955
3956         data.dptr  = discard_const(argv[2]);
3957         data.dsize = strlen((char *)data.dptr);
3958
3959         if (ctdb_record_store(h, data) != 0) {
3960                 printf("Failed to store record\n");
3961         }
3962
3963         talloc_free(h);
3964         talloc_free(tmp_ctx);
3965         talloc_free(ctdb_db);
3966         return 0;
3967 }
3968
3969 /*
3970   fetch a record from a persistent database
3971  */
3972 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3973 {
3974         const char *db_name;
3975         struct ctdb_db_context *ctdb_db;
3976         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3977         struct ctdb_transaction_handle *h;
3978         TDB_DATA key, data;
3979         int fd, ret;
3980         bool persistent;
3981         uint8_t flags;
3982
3983         if (argc < 2) {
3984                 talloc_free(tmp_ctx);
3985                 usage();
3986         }
3987
3988         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3989                 talloc_free(tmp_ctx);
3990                 return -1;
3991         }
3992
3993         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3994         if (!persistent) {
3995                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3996                 talloc_free(tmp_ctx);
3997                 return -1;
3998         }
3999
4000         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4001         if (ctdb_db == NULL) {
4002                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4003                 talloc_free(tmp_ctx);
4004                 return -1;
4005         }
4006
4007         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4008         if (h == NULL) {
4009                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4010                 talloc_free(tmp_ctx);
4011                 return -1;
4012         }
4013
4014         key.dptr  = discard_const(argv[1]);
4015         key.dsize = strlen(argv[1]);
4016         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
4017         if (ret != 0) {
4018                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
4019                 talloc_free(tmp_ctx);
4020                 return -1;
4021         }
4022
4023         if (data.dsize == 0 || data.dptr == NULL) {
4024                 DEBUG(DEBUG_ERR,("Record is empty\n"));
4025                 talloc_free(tmp_ctx);
4026                 return -1;
4027         }
4028
4029         if (argc == 3) {
4030           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4031                 if (fd == -1) {
4032                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
4033                         talloc_free(tmp_ctx);
4034                         return -1;
4035                 }
4036                 sys_write(fd, data.dptr, data.dsize);
4037                 close(fd);
4038         } else {
4039                 sys_write(1, data.dptr, data.dsize);
4040         }
4041
4042         /* abort the transaction */
4043         talloc_free(h);
4044
4045
4046         talloc_free(tmp_ctx);
4047         return 0;
4048 }
4049
4050 /*
4051   fetch a record from a tdb-file
4052  */
4053 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
4054 {
4055         const char *tdb_file;
4056         TDB_CONTEXT *tdb;
4057         TDB_DATA key, data;
4058         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4059         int fd;
4060
4061         if (argc < 2) {
4062                 usage();
4063         }
4064
4065         tdb_file = argv[0];
4066
4067         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
4068         if (tdb == NULL) {
4069                 printf("Failed to open TDB file %s\n", tdb_file);
4070                 return -1;
4071         }
4072
4073         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4074         if (key.dptr == NULL) {
4075                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4076                 return -1;
4077         }
4078
4079         data = tdb_fetch(tdb, key);
4080         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4081                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4082                 tdb_close(tdb);
4083                 return -1;
4084         }
4085
4086         tdb_close(tdb);
4087
4088         if (argc == 3) {
4089           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4090                 if (fd == -1) {
4091                         printf("Failed to open output file %s\n", argv[2]);
4092                         return -1;
4093                 }
4094                 if (options.verbose){
4095                         sys_write(fd, data.dptr, data.dsize);
4096                 } else {
4097                         sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4098                 }
4099                 close(fd);
4100         } else {
4101                 if (options.verbose){
4102                         sys_write(1, data.dptr, data.dsize);
4103                 } else {
4104                         sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4105                 }
4106         }
4107
4108         talloc_free(tmp_ctx);
4109         return 0;
4110 }
4111
4112 /*
4113   store a record and header to a tdb-file
4114  */
4115 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4116 {
4117         const char *tdb_file;
4118         TDB_CONTEXT *tdb;
4119         TDB_DATA key, value, data;
4120         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4121         struct ctdb_ltdb_header header;
4122
4123         if (argc < 3) {
4124                 usage();
4125         }
4126
4127         tdb_file = argv[0];
4128
4129         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4130         if (tdb == NULL) {
4131                 printf("Failed to open TDB file %s\n", tdb_file);
4132                 return -1;
4133         }
4134
4135         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4136         if (key.dptr == NULL) {
4137                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4138                 return -1;
4139         }
4140
4141         value = strtodata(tmp_ctx, argv[2], strlen(argv[2]));
4142         if (value.dptr == NULL) {
4143                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4144                 return -1;
4145         }
4146
4147         ZERO_STRUCT(header);
4148         if (argc > 3) {
4149                 header.rsn = atoll(argv[3]);
4150         }
4151         if (argc > 4) {
4152                 header.dmaster = atoi(argv[4]);
4153         }
4154         if (argc > 5) {
4155                 header.flags = atoi(argv[5]);
4156         }
4157
4158         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4159         data.dptr = talloc_size(tmp_ctx, data.dsize);
4160         if (data.dptr == NULL) {
4161                 printf("Failed to allocate header+value\n");
4162                 return -1;
4163         }
4164
4165         *(struct ctdb_ltdb_header *)data.dptr = header;
4166         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4167
4168         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4169                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4170                 tdb_close(tdb);
4171                 return -1;
4172         }
4173
4174         tdb_close(tdb);
4175
4176         talloc_free(tmp_ctx);
4177         return 0;
4178 }
4179
4180 /*
4181   write a record to a persistent database
4182  */
4183 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4184 {
4185         const char *db_name;
4186         struct ctdb_db_context *ctdb_db;
4187         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4188         struct ctdb_transaction_handle *h;
4189         struct stat st;
4190         TDB_DATA key, data;
4191         int fd, ret;
4192
4193         if (argc < 3) {
4194                 talloc_free(tmp_ctx);
4195                 usage();
4196         }
4197
4198         fd = open(argv[2], O_RDONLY);
4199         if (fd == -1) {
4200                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4201                 talloc_free(tmp_ctx);
4202                 return -1;
4203         }
4204         
4205         ret = fstat(fd, &st);
4206         if (ret == -1) {
4207                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4208                 close(fd);
4209                 talloc_free(tmp_ctx);
4210                 return -1;
4211         }
4212
4213         if (!S_ISREG(st.st_mode)) {
4214                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4215                 close(fd);
4216                 talloc_free(tmp_ctx);
4217                 return -1;
4218         }
4219
4220         data.dsize = st.st_size;
4221         if (data.dsize == 0) {
4222                 data.dptr  = NULL;
4223         } else {
4224                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4225                 if (data.dptr == NULL) {
4226                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4227                         close(fd);
4228                         talloc_free(tmp_ctx);
4229                         return -1;
4230                 }
4231                 ret = sys_read(fd, data.dptr, data.dsize);
4232                 if (ret != data.dsize) {
4233                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4234                         close(fd);
4235                         talloc_free(tmp_ctx);
4236                         return -1;
4237                 }
4238         }
4239         close(fd);
4240
4241
4242         db_name = argv[0];
4243
4244         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4245         if (ctdb_db == NULL) {
4246                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4247                 talloc_free(tmp_ctx);
4248                 return -1;
4249         }
4250
4251         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4252         if (h == NULL) {
4253                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4254                 talloc_free(tmp_ctx);
4255                 return -1;
4256         }
4257
4258         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4259         if (key.dptr == NULL) {
4260                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4261                 return -1;
4262         }
4263
4264         ret = ctdb_transaction_store(h, key, data);
4265         if (ret != 0) {
4266                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4267                 talloc_free(tmp_ctx);
4268                 return -1;
4269         }
4270
4271         ret = ctdb_transaction_commit(h);
4272         if (ret != 0) {
4273                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4274                 talloc_free(tmp_ctx);
4275                 return -1;
4276         }
4277
4278
4279         talloc_free(tmp_ctx);
4280         return 0;
4281 }
4282
4283 /*
4284  * delete a record from a persistent database
4285  */
4286 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4287 {
4288         const char *db_name;
4289         struct ctdb_db_context *ctdb_db;
4290         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4291         struct ctdb_transaction_handle *h;
4292         TDB_DATA key;
4293         int ret;
4294         bool persistent;
4295         uint8_t flags;
4296
4297         if (argc < 2) {
4298                 talloc_free(tmp_ctx);
4299                 usage();
4300         }
4301
4302         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4303                 talloc_free(tmp_ctx);
4304                 return -1;
4305         }
4306
4307         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4308         if (!persistent) {
4309                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4310                 talloc_free(tmp_ctx);
4311                 return -1;
4312         }
4313
4314         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4315         if (ctdb_db == NULL) {
4316                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4317                 talloc_free(tmp_ctx);
4318                 return -1;
4319         }
4320
4321         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4322         if (h == NULL) {
4323                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4324                 talloc_free(tmp_ctx);
4325                 return -1;
4326         }
4327
4328         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4329         if (key.dptr == NULL) {
4330                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4331                 return -1;
4332         }
4333
4334         ret = ctdb_transaction_store(h, key, tdb_null);
4335         if (ret != 0) {
4336                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4337                 talloc_free(tmp_ctx);
4338                 return -1;
4339         }
4340
4341         ret = ctdb_transaction_commit(h);
4342         if (ret != 0) {
4343                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4344                 talloc_free(tmp_ctx);
4345                 return -1;
4346         }
4347
4348         talloc_free(tmp_ctx);
4349         return 0;
4350 }
4351
4352 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4353                                        TDB_DATA *data)
4354 {
4355         const char *t;
4356         size_t n;
4357         const char *ret; /* Next byte after successfully parsed value */
4358
4359         /* Error, unless someone says otherwise */
4360         ret = NULL;
4361         /* Indicates no value to parse */
4362         *data = tdb_null;
4363
4364         /* Skip whitespace */
4365         n = strspn(s, " \t");
4366         t = s + n;
4367
4368         if (t[0] == '"') {
4369                 /* Quoted ASCII string - no wide characters! */
4370                 t++;
4371                 n = strcspn(t, "\"");
4372                 if (t[n] == '"') {
4373                         if (n > 0) {
4374                                 *data = strtodata(mem_ctx, t, n);
4375                                 CTDB_NOMEM_ABORT(data->dptr);
4376                         }
4377                         ret = t + n + 1;
4378                 } else {
4379                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4380                 }
4381         } else {
4382                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4383         }
4384
4385         return ret;
4386 }
4387
4388 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4389                                  TDB_DATA *key, TDB_DATA *value)
4390 {
4391         char line [1024]; /* FIXME: make this more flexible? */
4392         const char *t;
4393         char *ptr;
4394
4395         ptr = fgets(line, sizeof(line), file);
4396
4397         if (ptr == NULL) {
4398                 return false;
4399         }
4400
4401         /* Get key */
4402         t = ptrans_parse_string(mem_ctx, line, key);
4403         if (t == NULL || key->dptr == NULL) {
4404                 /* Line Ignored but not EOF */
4405                 return true;
4406         }
4407
4408         /* Get value */
4409         t = ptrans_parse_string(mem_ctx, t, value);
4410         if (t == NULL) {
4411                 /* Line Ignored but not EOF */
4412                 talloc_free(key->dptr);
4413                 *key = tdb_null;
4414                 return true;
4415         }
4416
4417         return true;
4418 }
4419
4420 /*
4421  * Update a persistent database as per file/stdin
4422  */
4423 static int control_ptrans(struct ctdb_context *ctdb,
4424                           int argc, const char **argv)
4425 {
4426         const char *db_name;
4427         struct ctdb_db_context *ctdb_db;
4428         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4429         struct ctdb_transaction_handle *h;
4430         TDB_DATA key, value;
4431         FILE *file;
4432         int ret;
4433
4434         if (argc < 1) {
4435                 talloc_free(tmp_ctx);
4436                 usage();
4437         }
4438
4439         file = stdin;
4440         if (argc == 2) {
4441                 file = fopen(argv[1], "r");
4442                 if (file == NULL) {
4443                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4444                         talloc_free(tmp_ctx);
4445                         return -1;
4446                 }
4447         }
4448
4449         db_name = argv[0];
4450
4451         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4452         if (ctdb_db == NULL) {
4453                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4454                 goto error;
4455         }
4456
4457         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4458         if (h == NULL) {
4459                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4460                 goto error;
4461         }
4462
4463         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4464                 if (key.dsize != 0) {
4465                         ret = ctdb_transaction_store(h, key, value);
4466                         /* Minimise memory use */
4467                         talloc_free(key.dptr);
4468                         if (value.dptr != NULL) {
4469                                 talloc_free(value.dptr);
4470                         }
4471                         if (ret != 0) {
4472                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4473                                 ctdb_transaction_cancel(h);
4474                                 goto error;
4475                         }
4476                 }
4477         }
4478
4479         ret = ctdb_transaction_commit(h);
4480         if (ret != 0) {
4481                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4482                 goto error;
4483         }
4484
4485         if (file != stdin) {
4486                 fclose(file);
4487         }
4488         talloc_free(tmp_ctx);
4489         return 0;
4490
4491 error:
4492         if (file != stdin) {
4493                 fclose(file);
4494         }
4495
4496         talloc_free(tmp_ctx);
4497         return -1;
4498 }
4499
4500 /*
4501   check if a service is bound to a port or not
4502  */
4503 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4504 {
4505         int s, ret;
4506         int v;
4507         int port;
4508         struct sockaddr_in sin;
4509
4510         if (argc != 1) {
4511                 printf("Use: ctdb chktcport <port>\n");
4512                 return EINVAL;
4513         }
4514
4515         port = atoi(argv[0]);
4516
4517         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4518         if (s == -1) {
4519                 printf("Failed to open local socket\n");
4520                 return errno;
4521         }
4522
4523         v = fcntl(s, F_GETFL, 0);
4524         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4525                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4526         }
4527
4528         bzero(&sin, sizeof(sin));
4529         sin.sin_family = PF_INET;
4530         sin.sin_port   = htons(port);
4531         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4532         close(s);
4533         if (ret == -1) {
4534                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4535                 return errno;
4536         }
4537
4538         return 0;
4539 }
4540
4541
4542 /* Reload public IPs on a specified nodes */
4543 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4544 {
4545         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4546         uint32_t *nodes;
4547         uint32_t pnn_mode;
4548         uint32_t timeout;
4549         int ret;
4550
4551         assert_single_node_only();
4552
4553         if (argc > 1) {
4554                 usage();
4555         }
4556
4557         /* Determine the nodes where IPs need to be reloaded */
4558         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4559                               options.pnn, true, &nodes, &pnn_mode)) {
4560                 ret = -1;
4561                 goto done;
4562         }
4563
4564 again:
4565         /* Disable takeover runs on all connected nodes.  A reply
4566          * indicating success is needed from each node so all nodes
4567          * will need to be active.  This will retry until maxruntime
4568          * is exceeded, hence no error handling.
4569          * 
4570          * A check could be added to not allow reloading of IPs when
4571          * there are disconnected nodes.  However, this should
4572          * probably be left up to the administrator.
4573          */
4574         timeout = LONGTIMEOUT;
4575         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4576                         "Disable takeover runs", true);
4577
4578         /* Now tell all the desired nodes to reload their public IPs.
4579          * Keep trying this until it succeeds.  This assumes all
4580          * failures are transient, which might not be true...
4581          */
4582         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4583                                       nodes, 0, LONGTIMELIMIT(),
4584                                       false, tdb_null,
4585                                       NULL, NULL, NULL) != 0) {
4586                 DEBUG(DEBUG_ERR,
4587                       ("Unable to reload IPs on some nodes, try again.\n"));
4588                 goto again;
4589         }
4590
4591         /* It isn't strictly necessary to wait until takeover runs are
4592          * re-enabled but doing so can't hurt.
4593          */
4594         timeout = 0;
4595         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4596                         "Enable takeover runs", true);
4597
4598         ipreallocate(ctdb);
4599
4600         ret = 0;
4601 done:
4602         talloc_free(tmp_ctx);
4603         return ret;
4604 }
4605
4606 /*
4607   display a list of the databases on a remote ctdb
4608  */
4609 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4610 {
4611         int i, ret;
4612         struct ctdb_dbid_map *dbmap=NULL;
4613
4614         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4615         if (ret != 0) {
4616                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4617                 return ret;
4618         }
4619
4620         if(options.machinereadable){
4621                 printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4622                 for(i=0;i<dbmap->num;i++){
4623                         const char *path = NULL;
4624                         const char *name = NULL;
4625                         const char *health = NULL;
4626                         bool persistent;
4627                         bool readonly;
4628                         bool sticky;
4629
4630                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4631                                             dbmap->dbs[i].dbid, ctdb, &path);
4632                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4633                                             dbmap->dbs[i].dbid, ctdb, &name);
4634                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4635                                               dbmap->dbs[i].dbid, ctdb, &health);
4636                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4637                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4638                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4639                         printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4640                                dbmap->dbs[i].dbid, name, path,
4641                                !!(persistent), !!(sticky),
4642                                !!(health), !!(readonly));
4643                 }
4644                 return 0;
4645         }
4646
4647         printf("Number of databases:%d\n", dbmap->num);
4648         for(i=0;i<dbmap->num;i++){
4649                 const char *path = NULL;
4650                 const char *name = NULL;
4651                 const char *health = NULL;
4652                 bool persistent;
4653                 bool readonly;
4654                 bool sticky;
4655
4656                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4657                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4658                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4659                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4660                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4661                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4662                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4663                        dbmap->dbs[i].dbid, name, path,
4664                        persistent?" PERSISTENT":"",
4665                        sticky?" STICKY":"",
4666                        readonly?" READONLY":"",
4667                        health?" UNHEALTHY":"");
4668         }
4669
4670         return 0;
4671 }
4672
4673 /*
4674   display the status of a database on a remote ctdb
4675  */
4676 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4677 {
4678         const char *db_name;
4679         uint32_t db_id;
4680         uint8_t flags;
4681         const char *path = NULL;
4682         const char *health = NULL;
4683
4684         if (argc < 1) {
4685                 usage();
4686         }
4687
4688         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4689                 return -1;
4690         }
4691
4692         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4693         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4694         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4695                db_id, db_name, path,
4696                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4697                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4698                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4699                (health ? health : "OK"));
4700
4701         return 0;
4702 }
4703
4704 /*
4705   check if the local node is recmaster or not
4706   it will return 1 if this node is the recmaster and 0 if it is not
4707   or if the local ctdb daemon could not be contacted
4708  */
4709 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4710 {
4711         uint32_t mypnn, recmaster;
4712         int ret;
4713
4714         assert_single_node_only();
4715
4716         mypnn = getpnn(ctdb);
4717
4718         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4719         if (ret != 0) {
4720                 printf("Failed to get the recmaster\n");
4721                 return 1;
4722         }
4723
4724         if (recmaster != mypnn) {
4725                 printf("this node is not the recmaster\n");
4726                 return 1;
4727         }
4728
4729         printf("this node is the recmaster\n");
4730         return 0;
4731 }
4732
4733 /*
4734   ping a node
4735  */
4736 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4737 {
4738         int ret;
4739         struct timeval tv = timeval_current();
4740         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4741         if (ret == -1) {
4742                 printf("Unable to get ping response from node %u\n", options.pnn);
4743                 return -1;
4744         } else {
4745                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4746                        options.pnn, timeval_elapsed(&tv), ret);
4747         }
4748         return 0;
4749 }
4750
4751
4752 /*
4753   get a node's runstate
4754  */
4755 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4756 {
4757         int ret;
4758         enum ctdb_runstate runstate;
4759
4760         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4761         if (ret == -1) {
4762                 printf("Unable to get runstate response from node %u\n",
4763                        options.pnn);
4764                 return -1;
4765         } else {
4766                 bool found = true;
4767                 enum ctdb_runstate t;
4768                 int i;
4769                 for (i=0; i<argc; i++) {
4770                         found = false;
4771                         t = runstate_from_string(argv[i]);
4772                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4773                                 printf("Invalid run state (%s)\n", argv[i]);
4774                                 return -1;
4775                         }
4776
4777                         if (t == runstate) {
4778                                 found = true;
4779                                 break;
4780                         }
4781                 }
4782
4783                 if (!found) {
4784                         printf("CTDB not in required run state (got %s)\n", 
4785                                runstate_to_string((enum ctdb_runstate)runstate));
4786                         return -1;
4787                 }
4788         }
4789
4790         printf("%s\n", runstate_to_string(runstate));
4791         return 0;
4792 }
4793
4794
4795 /*
4796   get a tunable
4797  */
4798 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4799 {
4800         const char *name;
4801         uint32_t value;
4802         int ret;
4803
4804         if (argc < 1) {
4805                 usage();
4806         }
4807
4808         name = argv[0];
4809         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4810         if (ret != 0) {
4811                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4812                 return -1;
4813         }
4814
4815         printf("%-23s = %u\n", name, value);
4816         return 0;
4817 }
4818
4819 /*
4820   set a tunable
4821  */
4822 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4823 {
4824         const char *name;
4825         uint32_t value;
4826         int ret;
4827
4828         if (argc < 2) {
4829                 usage();
4830         }
4831
4832         name = argv[0];
4833         value = strtoul(argv[1], NULL, 0);
4834
4835         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4836         if (ret == -1) {
4837                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4838                 return -1;
4839         }
4840         if (ret == 1) {
4841                 DEBUG(DEBUG_WARNING,
4842                       ("Setting obsolete tunable variable '%s'\n",
4843                        name));
4844         }
4845         return 0;
4846 }
4847
4848 /*
4849   list all tunables
4850  */
4851 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4852 {
4853         uint32_t count;
4854         const char **list;
4855         int ret, i;
4856
4857         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4858         if (ret == -1) {
4859                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4860                 return -1;
4861         }
4862
4863         for (i=0;i<count;i++) {
4864                 control_getvar(ctdb, 1, &list[i]);
4865         }
4866
4867         talloc_free(list);
4868         
4869         return 0;
4870 }
4871
4872 /*
4873   display debug level on a node
4874  */
4875 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4876 {
4877         int ret;
4878         int32_t level;
4879
4880         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4881         if (ret != 0) {
4882                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4883                 return ret;
4884         } else {
4885                 const char *desc = get_debug_by_level(level);
4886                 if (desc == NULL) {
4887                         /* This should never happen */
4888                         desc = "Unknown";
4889                 }
4890                 if (options.machinereadable){
4891                         printm(":Name:Level:\n");
4892                         printm(":%s:%d:\n", desc, level);
4893                 } else {
4894                         printf("Node %u is at debug level %s (%d)\n",
4895                                options.pnn, desc, level);
4896                 }
4897         }
4898         return 0;
4899 }
4900
4901 /*
4902   display reclock file of a node
4903  */
4904 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4905 {
4906         int ret;
4907         const char *reclock;
4908
4909         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4910         if (ret != 0) {
4911                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4912                 return ret;
4913         } else {
4914                 if (options.machinereadable){
4915                         if (reclock != NULL) {
4916                                 printm("%s", reclock);
4917                         }
4918                 } else {
4919                         if (reclock == NULL) {
4920                                 printf("No reclock file used.\n");
4921                         } else {
4922                                 printf("Reclock file:%s\n", reclock);
4923                         }
4924                 }
4925         }
4926         return 0;
4927 }
4928
4929 /*
4930   set the reclock file of a node
4931  */
4932 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4933 {
4934         int ret;
4935         const char *reclock = NULL;
4936
4937         if (argc == 0) {
4938                 reclock = NULL;
4939         } else if (argc == 1) {
4940                 reclock = argv[0];
4941         } else {
4942                 usage();
4943         }
4944
4945         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4946         if (ret != 0) {
4947                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4948                 return ret;
4949         }
4950         return 0;
4951 }
4952
4953 /*
4954   set the natgw state on/off
4955  */
4956 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4957 {
4958         int ret;
4959         uint32_t natgwstate = 0;
4960
4961         if (argc == 0) {
4962                 usage();
4963         }
4964
4965         if (!strcmp(argv[0], "on")) {
4966                 natgwstate = 1;
4967         } else if (!strcmp(argv[0], "off")) {
4968                 natgwstate = 0;
4969         } else {
4970                 usage();
4971         }
4972
4973         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4974         if (ret != 0) {
4975                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4976                 return ret;
4977         }
4978
4979         return 0;
4980 }
4981
4982 /*
4983   set the lmaster role on/off
4984  */
4985 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4986 {
4987         int ret;
4988         uint32_t lmasterrole = 0;
4989
4990         if (argc == 0) {
4991                 usage();
4992         }
4993
4994         if (!strcmp(argv[0], "on")) {
4995                 lmasterrole = 1;
4996         } else if (!strcmp(argv[0], "off")) {
4997                 lmasterrole = 0;
4998         } else {
4999                 usage();
5000         }
5001
5002         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
5003         if (ret != 0) {
5004                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
5005                 return ret;
5006         }
5007
5008         return 0;
5009 }
5010
5011 /*
5012   set the recmaster role on/off
5013  */
5014 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5015 {
5016         int ret;
5017         uint32_t recmasterrole = 0;
5018
5019         if (argc == 0) {
5020                 usage();
5021         }
5022
5023         if (!strcmp(argv[0], "on")) {
5024                 recmasterrole = 1;
5025         } else if (!strcmp(argv[0], "off")) {
5026                 recmasterrole = 0;
5027         } else {
5028                 usage();
5029         }
5030
5031         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5032         if (ret != 0) {
5033                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5034                 return ret;
5035         }
5036
5037         return 0;
5038 }
5039
5040 /*
5041   set debug level on a node or all nodes
5042  */
5043 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5044 {
5045         int ret;
5046         int32_t level;
5047
5048         if (argc == 0) {
5049                 printf("You must specify the debug level. Valid levels are:\n");
5050                 print_debug_levels(stdout);
5051                 return 0;
5052         }
5053
5054         if (!parse_debug(argv[0], &level)) {
5055                 printf("Invalid debug level, must be one of\n");
5056                 print_debug_levels(stdout);
5057                 return -1;
5058         }
5059
5060         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5061         if (ret != 0) {
5062                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5063         }
5064         return 0;
5065 }
5066
5067
5068 /*
5069   thaw a node
5070  */
5071 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5072 {
5073         int ret;
5074         uint32_t priority;
5075         
5076         if (argc == 1) {
5077                 priority = strtol(argv[0], NULL, 0);
5078         } else {
5079                 priority = 0;
5080         }
5081         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5082
5083         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5084         if (ret != 0) {
5085                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5086         }               
5087         return 0;
5088 }
5089
5090
5091 /*
5092   attach to a database
5093  */
5094 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5095 {
5096         const char *db_name;
5097         struct ctdb_db_context *ctdb_db;
5098         bool persistent = false;
5099
5100         if (argc < 1) {
5101                 usage();
5102         }
5103         db_name = argv[0];
5104         if (argc > 2) {
5105                 usage();
5106         }
5107         if (argc == 2) {
5108                 if (strcmp(argv[1], "persistent") != 0) {
5109                         usage();
5110                 }
5111                 persistent = true;
5112         }
5113
5114         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5115         if (ctdb_db == NULL) {
5116                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5117                 return -1;
5118         }
5119
5120         return 0;
5121 }
5122
5123 /*
5124  * detach from a database
5125  */
5126 static int control_detach(struct ctdb_context *ctdb, int argc,
5127                           const char **argv)
5128 {
5129         uint32_t db_id;
5130         uint8_t flags;
5131         int ret, i, status = 0;
5132         struct ctdb_node_map *nodemap = NULL;
5133         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5134         uint32_t recmode;
5135
5136         if (argc < 1) {
5137                 usage();
5138         }
5139
5140         assert_single_node_only();
5141
5142         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
5143                                     &recmode);
5144         if (ret != 0) {
5145                 DEBUG(DEBUG_ERR, ("Database cannot be detached "
5146                                   "when recovery is active\n"));
5147                 talloc_free(tmp_ctx);
5148                 return -1;
5149         }
5150
5151         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5152                                    &nodemap);
5153         if (ret != 0) {
5154                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5155                                   options.pnn));
5156                 talloc_free(tmp_ctx);
5157                 return -1;
5158         }
5159
5160         for (i=0; i<nodemap->num; i++) {
5161                 uint32_t value;
5162
5163                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
5164                         continue;
5165                 }
5166
5167                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
5168                         continue;
5169                 }
5170
5171                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
5172                         DEBUG(DEBUG_ERR, ("Database cannot be detached on "
5173                                           "inactive (stopped or banned) node "
5174                                           "%u\n", nodemap->nodes[i].pnn));
5175                         talloc_free(tmp_ctx);
5176                         return -1;
5177                 }
5178
5179                 ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
5180                                             nodemap->nodes[i].pnn,
5181                                             "AllowClientDBAttach",
5182                                             &value);
5183                 if (ret != 0) {
5184                         DEBUG(DEBUG_ERR, ("Unable to get tunable "
5185                                           "AllowClientDBAttach from node %u\n",
5186                                            nodemap->nodes[i].pnn));
5187                         talloc_free(tmp_ctx);
5188                         return -1;
5189                 }
5190
5191                 if (value == 1) {
5192                         DEBUG(DEBUG_ERR, ("Database access is still active on "
5193                                           "node %u. Set AllowClientDBAttach=0 "
5194                                           "on all nodes.\n",
5195                                           nodemap->nodes[i].pnn));
5196                         talloc_free(tmp_ctx);
5197                         return -1;
5198                 }
5199         }
5200
5201         talloc_free(tmp_ctx);
5202
5203         for (i=0; i<argc; i++) {
5204                 if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
5205                         continue;
5206                 }
5207
5208                 if (flags & CTDB_DB_FLAGS_PERSISTENT) {
5209                         DEBUG(DEBUG_ERR, ("Persistent database '%s' "
5210                                           "cannot be detached\n", argv[i]));
5211                         status = -1;
5212                         continue;
5213                 }
5214
5215                 ret = ctdb_detach(ctdb, db_id);
5216                 if (ret != 0) {
5217                         DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
5218                                           argv[i]));
5219                         status = ret;
5220                 }
5221         }
5222
5223         return status;
5224 }
5225
5226 /*
5227   set db priority
5228  */
5229 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5230 {
5231         struct ctdb_db_priority db_prio;
5232         int ret;
5233
5234         if (argc < 2) {
5235                 usage();
5236         }
5237
5238         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5239         db_prio.priority = strtoul(argv[1], NULL, 0);
5240
5241         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5242         if (ret != 0) {
5243                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5244                 return -1;
5245         }
5246
5247         return 0;
5248 }
5249
5250 /*
5251   get db priority
5252  */
5253 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5254 {
5255         uint32_t db_id, priority;
5256         int ret;
5257
5258         if (argc < 1) {
5259                 usage();
5260         }
5261
5262         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5263                 return -1;
5264         }
5265
5266         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5267         if (ret != 0) {
5268                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5269                 return -1;
5270         }
5271
5272         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5273
5274         return 0;
5275 }
5276
5277 /*
5278   set the sticky records capability for a database
5279  */
5280 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5281 {
5282         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5283         uint32_t db_id;
5284         int ret;
5285
5286         if (argc < 1) {
5287                 usage();
5288         }
5289
5290         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5291                 return -1;
5292         }
5293
5294         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5295         if (ret != 0) {
5296                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5297                 talloc_free(tmp_ctx);
5298                 return -1;
5299         }
5300
5301         talloc_free(tmp_ctx);
5302         return 0;
5303 }
5304
5305 /*
5306   set the readonly capability for a database
5307  */
5308 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5309 {
5310         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5311         uint32_t db_id;
5312         int ret;
5313
5314         if (argc < 1) {
5315                 usage();
5316         }
5317
5318         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5319                 return -1;
5320         }
5321
5322         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5323         if (ret != 0) {
5324                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5325                 talloc_free(tmp_ctx);
5326                 return -1;
5327         }
5328
5329         talloc_free(tmp_ctx);
5330         return 0;
5331 }
5332
5333 /*
5334   get db seqnum
5335  */
5336 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5337 {
5338         uint32_t db_id;
5339         uint64_t seqnum;
5340         int ret;
5341
5342         if (argc < 1) {
5343                 usage();
5344         }
5345
5346         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5347                 return -1;
5348         }
5349
5350         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5351         if (ret != 0) {
5352                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5353                 return -1;
5354         }
5355
5356         printf("Sequence number:%lld\n", (long long)seqnum);
5357
5358         return 0;
5359 }
5360
5361 /*
5362   run an eventscript on a node
5363  */
5364 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5365 {
5366         TDB_DATA data;
5367         int ret;
5368         int32_t res;
5369         char *errmsg;
5370         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5371
5372         if (argc != 1) {
5373                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5374                 return -1;
5375         }
5376
5377         data.dptr = (unsigned char *)discard_const(argv[0]);
5378         data.dsize = strlen((char *)data.dptr) + 1;
5379
5380         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5381
5382         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5383                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5384         if (ret != 0 || res != 0) {
5385                 if (errmsg != NULL) {
5386                         DEBUG(DEBUG_ERR,
5387                               ("Failed to run eventscripts - %s\n", errmsg));
5388                 } else {
5389                         DEBUG(DEBUG_ERR, ("Failed to run eventscripts\n"));
5390                 }
5391                 talloc_free(tmp_ctx);
5392                 return -1;
5393         }
5394         talloc_free(tmp_ctx);
5395         return 0;
5396 }
5397
5398 #define DB_VERSION 1
5399 #define MAX_DB_NAME 64
5400 struct db_file_header {
5401         unsigned long version;
5402         time_t timestamp;
5403         unsigned long persistent;
5404         unsigned long size;
5405         const char name[MAX_DB_NAME];
5406 };
5407
5408 struct backup_data {
5409         struct ctdb_marshall_buffer *records;
5410         uint32_t len;
5411         uint32_t total;
5412         bool traverse_error;
5413 };
5414
5415 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5416 {
5417         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5418         struct ctdb_rec_data *rec;
5419
5420         /* add the record */
5421         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5422         if (rec == NULL) {
5423                 bd->traverse_error = true;
5424                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5425                 return -1;
5426         }
5427         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5428         if (bd->records == NULL) {
5429                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5430                 bd->traverse_error = true;
5431                 return -1;
5432         }
5433         bd->records->count++;
5434         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5435         bd->len += rec->length;
5436         talloc_free(rec);
5437
5438         bd->total++;
5439         return 0;
5440 }
5441
5442 /*
5443  * backup a database to a file 
5444  */
5445 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5446 {
5447         const char *db_name;
5448         int ret;
5449         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5450         struct db_file_header dbhdr;
5451         struct ctdb_db_context *ctdb_db;
5452         struct backup_data *bd;
5453         int fh = -1;
5454         int status = -1;
5455         const char *reason = NULL;
5456         uint32_t db_id;
5457         uint8_t flags;
5458
5459         assert_single_node_only();
5460
5461         if (argc != 2) {
5462                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5463                 return -1;
5464         }
5465
5466         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5467                 return -1;
5468         }
5469
5470         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5471                                     db_id, tmp_ctx, &reason);
5472         if (ret != 0) {
5473                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5474                                  argv[0]));
5475                 talloc_free(tmp_ctx);
5476                 return -1;
5477         }
5478         if (reason) {
5479                 uint32_t allow_unhealthy = 0;
5480
5481                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5482                                       "AllowUnhealthyDBRead",
5483                                       &allow_unhealthy);
5484
5485                 if (allow_unhealthy != 1) {
5486                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5487                                          argv[0], reason));
5488
5489                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5490                                          allow_unhealthy));
5491                         talloc_free(tmp_ctx);
5492                         return -1;
5493                 }
5494
5495                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5496                                      argv[0], argv[0]));
5497                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5498                                      "tunnable AllowUnhealthyDBRead = %u\n",
5499                                      allow_unhealthy));
5500         }
5501
5502         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5503         if (ctdb_db == NULL) {
5504                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5505                 talloc_free(tmp_ctx);
5506                 return -1;
5507         }
5508
5509
5510         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5511         if (ret == -1) {
5512                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5513                 talloc_free(tmp_ctx);
5514                 return -1;
5515         }
5516
5517
5518         bd = talloc_zero(tmp_ctx, struct backup_data);
5519         if (bd == NULL) {
5520                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5521                 talloc_free(tmp_ctx);
5522                 return -1;
5523         }
5524
5525         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5526         if (bd->records == NULL) {
5527                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5528                 talloc_free(tmp_ctx);
5529                 return -1;
5530         }
5531
5532         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5533         bd->records->db_id = ctdb_db->db_id;
5534         /* traverse the database collecting all records */
5535         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5536             bd->traverse_error) {
5537                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5538                 talloc_free(tmp_ctx);
5539                 return -1;              
5540         }
5541
5542         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5543
5544
5545         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5546         if (fh == -1) {
5547                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5548                 talloc_free(tmp_ctx);
5549                 return -1;
5550         }
5551
5552         ZERO_STRUCT(dbhdr);
5553         dbhdr.version = DB_VERSION;
5554         dbhdr.timestamp = time(NULL);
5555         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5556         dbhdr.size = bd->len;
5557         if (strlen(argv[0]) >= MAX_DB_NAME) {
5558                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5559                 goto done;
5560         }
5561         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5562         ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
5563         if (ret == -1) {
5564                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5565                 goto done;
5566         }
5567         ret = sys_write(fh, bd->records, bd->len);
5568         if (ret == -1) {
5569                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5570                 goto done;
5571         }
5572
5573         status = 0;
5574 done:
5575         if (fh != -1) {
5576                 ret = close(fh);
5577                 if (ret == -1) {
5578                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5579                 }
5580         }
5581
5582         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5583
5584         talloc_free(tmp_ctx);
5585         return status;
5586 }
5587
5588 /*
5589  * restore a database from a file 
5590  */
5591 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5592 {
5593         int ret;
5594         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5595         TDB_DATA outdata;
5596         TDB_DATA data;
5597         struct db_file_header dbhdr;
5598         struct ctdb_db_context *ctdb_db;
5599         struct ctdb_node_map *nodemap=NULL;
5600         struct ctdb_vnn_map *vnnmap=NULL;
5601         int i, fh;
5602         struct ctdb_control_transdb w;
5603         uint32_t *nodes;
5604         uint32_t generation;
5605         struct tm *tm;
5606         char tbuf[100];
5607         char *dbname;
5608
5609         assert_single_node_only();
5610
5611         if (argc < 1 || argc > 2) {
5612                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5613                 return -1;
5614         }
5615
5616         fh = open(argv[0], O_RDONLY);
5617         if (fh == -1) {
5618                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5619                 talloc_free(tmp_ctx);
5620                 return -1;
5621         }
5622
5623         sys_read(fh, &dbhdr, sizeof(dbhdr));
5624         if (dbhdr.version != DB_VERSION) {
5625                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5626                 close(fh);
5627                 talloc_free(tmp_ctx);
5628                 return -1;
5629         }
5630
5631         dbname = discard_const(dbhdr.name);
5632         if (argc == 2) {
5633                 dbname = discard_const(argv[1]);
5634         }
5635
5636         outdata.dsize = dbhdr.size;
5637         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5638         if (outdata.dptr == NULL) {
5639                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5640                 close(fh);
5641                 talloc_free(tmp_ctx);
5642                 return -1;
5643         }               
5644         sys_read(fh, outdata.dptr, outdata.dsize);
5645         close(fh);
5646
5647         tm = localtime(&dbhdr.timestamp);
5648         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5649         printf("Restoring database '%s' from backup @ %s\n",
5650                 dbname, tbuf);
5651
5652
5653         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5654         if (ctdb_db == NULL) {
5655                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5656                 talloc_free(tmp_ctx);
5657                 return -1;
5658         }
5659
5660         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5661         if (ret != 0) {
5662                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5663                 talloc_free(tmp_ctx);
5664                 return ret;
5665         }
5666
5667
5668         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5669         if (ret != 0) {
5670                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5671                 talloc_free(tmp_ctx);
5672                 return ret;
5673         }
5674
5675         /* freeze all nodes */
5676         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5677         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5678                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5679                                         nodes, i,
5680                                         TIMELIMIT(),
5681                                         false, tdb_null,
5682                                         NULL, NULL,
5683                                         NULL) != 0) {
5684                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5685                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5686                         talloc_free(tmp_ctx);
5687                         return -1;
5688                 }
5689         }
5690
5691         generation = vnnmap->generation;
5692         data.dptr = (void *)&generation;
5693         data.dsize = sizeof(generation);
5694
5695         /* start a cluster wide transaction */
5696         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5697         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5698                                         nodes, 0,
5699                                         TIMELIMIT(), false, data,
5700                                         NULL, NULL,
5701                                         NULL) != 0) {
5702                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5703                 return -1;
5704         }
5705
5706
5707         w.db_id = ctdb_db->db_id;
5708         w.transaction_id = generation;
5709
5710         data.dptr = (void *)&w;
5711         data.dsize = sizeof(w);
5712
5713         /* wipe all the remote databases. */
5714         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5715         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5716                                         nodes, 0,
5717                                         TIMELIMIT(), false, data,
5718                                         NULL, NULL,
5719                                         NULL) != 0) {
5720                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5721                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5722                 talloc_free(tmp_ctx);
5723                 return -1;
5724         }
5725         
5726         /* push the database */
5727         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5728         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5729                                         nodes, 0,
5730                                         TIMELIMIT(), false, outdata,
5731                                         NULL, NULL,
5732                                         NULL) != 0) {
5733                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5734                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5735                 talloc_free(tmp_ctx);
5736                 return -1;
5737         }
5738
5739         data.dptr = (void *)&ctdb_db->db_id;
5740         data.dsize = sizeof(ctdb_db->db_id);
5741
5742         /* mark the database as healthy */
5743         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5744         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5745                                         nodes, 0,
5746                                         TIMELIMIT(), false, data,
5747                                         NULL, NULL,
5748                                         NULL) != 0) {
5749                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5750                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5751                 talloc_free(tmp_ctx);
5752                 return -1;
5753         }
5754
5755         data.dptr = (void *)&generation;
5756         data.dsize = sizeof(generation);
5757
5758         /* commit all the changes */
5759         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5760                                         nodes, 0,
5761                                         TIMELIMIT(), false, data,
5762                                         NULL, NULL,
5763                                         NULL) != 0) {
5764                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5765                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5766                 talloc_free(tmp_ctx);
5767                 return -1;
5768         }
5769
5770
5771         /* thaw all nodes */
5772         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5773         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5774                                         nodes, 0,
5775                                         TIMELIMIT(),
5776                                         false, tdb_null,
5777                                         NULL, NULL,
5778                                         NULL) != 0) {
5779                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5780                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5781                 talloc_free(tmp_ctx);
5782                 return -1;
5783         }
5784
5785
5786         talloc_free(tmp_ctx);
5787         return 0;
5788 }
5789
5790 /*
5791  * dump a database backup from a file
5792  */
5793 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5794 {
5795         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5796         TDB_DATA outdata;
5797         struct db_file_header dbhdr;
5798         int i, fh;
5799         struct tm *tm;
5800         char tbuf[100];
5801         struct ctdb_rec_data *rec = NULL;
5802         struct ctdb_marshall_buffer *m;
5803         struct ctdb_dump_db_context c;
5804
5805         assert_single_node_only();
5806
5807         if (argc != 1) {
5808                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5809                 return -1;
5810         }
5811
5812         fh = open(argv[0], O_RDONLY);
5813         if (fh == -1) {
5814                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5815                 talloc_free(tmp_ctx);
5816                 return -1;
5817         }
5818
5819         sys_read(fh, &dbhdr, sizeof(dbhdr));
5820         if (dbhdr.version != DB_VERSION) {
5821                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5822                 close(fh);
5823                 talloc_free(tmp_ctx);
5824                 return -1;
5825         }
5826
5827         outdata.dsize = dbhdr.size;
5828         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5829         if (outdata.dptr == NULL) {
5830                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5831                 close(fh);
5832                 talloc_free(tmp_ctx);
5833                 return -1;
5834         }
5835         sys_read(fh, outdata.dptr, outdata.dsize);
5836         close(fh);
5837         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5838
5839         tm = localtime(&dbhdr.timestamp);
5840         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5841         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5842                 dbhdr.name, m->db_id, tbuf);
5843
5844         ZERO_STRUCT(c);
5845         c.ctdb = ctdb;
5846         c.f = stdout;
5847         c.printemptyrecords = (bool)options.printemptyrecords;
5848         c.printdatasize = (bool)options.printdatasize;
5849         c.printlmaster = false;
5850         c.printhash = (bool)options.printhash;
5851         c.printrecordflags = (bool)options.printrecordflags;
5852
5853         for (i=0; i < m->count; i++) {
5854                 uint32_t reqid = 0;
5855                 TDB_DATA key, data;
5856
5857                 /* we do not want the header splitted, so we pass NULL*/
5858                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5859                                               NULL, &key, &data);
5860
5861                 ctdb_dumpdb_record(key, data, &c);
5862         }
5863
5864         printf("Dumped %d records\n", i);
5865         talloc_free(tmp_ctx);
5866         return 0;
5867 }
5868
5869 /*
5870  * wipe a database from a file
5871  */
5872 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5873                           const char **argv)
5874 {
5875         const char *db_name;
5876         int ret;
5877         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5878         TDB_DATA data;
5879         struct ctdb_db_context *ctdb_db;
5880         struct ctdb_node_map *nodemap = NULL;
5881         struct ctdb_vnn_map *vnnmap = NULL;
5882         int i;
5883         struct ctdb_control_transdb w;
5884         uint32_t *nodes;
5885         uint32_t generation;
5886         uint8_t flags;
5887
5888         assert_single_node_only();
5889
5890         if (argc != 1) {
5891                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5892                 return -1;
5893         }
5894
5895         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5896                 return -1;
5897         }
5898
5899         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5900         if (ctdb_db == NULL) {
5901                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5902                                   argv[0]));
5903                 talloc_free(tmp_ctx);
5904                 return -1;
5905         }
5906
5907         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5908                                    &nodemap);
5909         if (ret != 0) {
5910                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5911                                   options.pnn));
5912                 talloc_free(tmp_ctx);
5913                 return ret;
5914         }
5915
5916         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5917                                   &vnnmap);
5918         if (ret != 0) {
5919                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5920                                   options.pnn));
5921                 talloc_free(tmp_ctx);
5922                 return ret;
5923         }
5924
5925         /* freeze all nodes */
5926         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5927         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5928                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5929                                                 nodes, i,
5930                                                 TIMELIMIT(),
5931                                                 false, tdb_null,
5932                                                 NULL, NULL,
5933                                                 NULL);
5934                 if (ret != 0) {
5935                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5936                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5937                                              CTDB_RECOVERY_ACTIVE);
5938                         talloc_free(tmp_ctx);
5939                         return -1;
5940                 }
5941         }
5942
5943         generation = vnnmap->generation;
5944         data.dptr = (void *)&generation;
5945         data.dsize = sizeof(generation);
5946
5947         /* start a cluster wide transaction */
5948         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5949         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5950                                         nodes, 0,
5951                                         TIMELIMIT(), false, data,
5952                                         NULL, NULL,
5953                                         NULL);
5954         if (ret!= 0) {
5955                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5956                                   "transactions.\n"));
5957                 return -1;
5958         }
5959
5960         w.db_id = ctdb_db->db_id;
5961         w.transaction_id = generation;
5962
5963         data.dptr = (void *)&w;
5964         data.dsize = sizeof(w);
5965
5966         /* wipe all the remote databases. */
5967         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5968         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5969                                         nodes, 0,
5970                                         TIMELIMIT(), false, data,
5971                                         NULL, NULL,
5972                                         NULL) != 0) {
5973                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5974                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5975                 talloc_free(tmp_ctx);
5976                 return -1;
5977         }
5978
5979         data.dptr = (void *)&ctdb_db->db_id;
5980         data.dsize = sizeof(ctdb_db->db_id);
5981
5982         /* mark the database as healthy */
5983         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5984         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5985                                         nodes, 0,
5986                                         TIMELIMIT(), false, data,
5987                                         NULL, NULL,
5988                                         NULL) != 0) {
5989                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5990                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5991                 talloc_free(tmp_ctx);
5992                 return -1;
5993         }
5994
5995         data.dptr = (void *)&generation;
5996         data.dsize = sizeof(generation);
5997
5998         /* commit all the changes */
5999         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
6000                                         nodes, 0,
6001                                         TIMELIMIT(), false, data,
6002                                         NULL, NULL,
6003                                         NULL) != 0) {
6004                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
6005                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6006                 talloc_free(tmp_ctx);
6007                 return -1;
6008         }
6009
6010         /* thaw all nodes */
6011         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
6012         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
6013                                         nodes, 0,
6014                                         TIMELIMIT(),
6015                                         false, tdb_null,
6016                                         NULL, NULL,
6017                                         NULL) != 0) {
6018                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
6019                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6020                 talloc_free(tmp_ctx);
6021                 return -1;
6022         }
6023
6024         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
6025
6026         talloc_free(tmp_ctx);
6027         return 0;
6028 }
6029
6030 /*
6031   dump memory usage
6032  */
6033 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6034 {
6035         TDB_DATA data;
6036         int ret;
6037         int32_t res;
6038         char *errmsg;
6039         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
6040         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
6041                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
6042         if (ret != 0 || res != 0) {
6043                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
6044                 talloc_free(tmp_ctx);
6045                 return -1;
6046         }
6047         sys_write(1, data.dptr, data.dsize);
6048         talloc_free(tmp_ctx);
6049         return 0;
6050 }
6051
6052 /*
6053   handler for memory dumps
6054 */
6055 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
6056 {
6057         sys_write(1, data.dptr, data.dsize);
6058         exit(0);
6059 }
6060
6061 /*
6062   dump memory usage on the recovery daemon
6063  */
6064 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6065 {
6066         int ret;
6067         TDB_DATA data;
6068         struct srvid_request rd;
6069
6070         rd.pnn = ctdb_get_pnn(ctdb);
6071         rd.srvid = getpid();
6072
6073         /* register a message port for receiveing the reply so that we
6074            can receive the reply
6075         */
6076         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6077
6078
6079         data.dptr = (uint8_t *)&rd;
6080         data.dsize = sizeof(rd);
6081
6082         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6083         if (ret != 0) {
6084                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6085                 return -1;
6086         }
6087
6088         /* this loop will terminate when we have received the reply */
6089         while (1) {
6090                 tevent_loop_once(ctdb->ev);
6091         }
6092
6093         return 0;
6094 }
6095
6096 /*
6097   send a message to a srvid
6098  */
6099 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6100 {
6101         unsigned long srvid;
6102         int ret;
6103         TDB_DATA data;
6104
6105         if (argc < 2) {
6106                 usage();
6107         }
6108
6109         srvid      = strtoul(argv[0], NULL, 0);
6110
6111         data.dptr = (uint8_t *)discard_const(argv[1]);
6112         data.dsize= strlen(argv[1]);
6113
6114         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6115         if (ret != 0) {
6116                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6117                 return -1;
6118         }
6119
6120         return 0;
6121 }
6122
6123 /*
6124   handler for msglisten
6125 */
6126 static void msglisten_handler(uint64_t srvid, TDB_DATA data,
6127                               void *private_data)
6128 {
6129         int i;
6130
6131         printf("Message received: ");
6132         for (i=0;i<data.dsize;i++) {
6133                 printf("%c", data.dptr[i]);
6134         }
6135         printf("\n");
6136 }
6137
6138 /*
6139   listen for messages on a messageport
6140  */
6141 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6142 {
6143         uint64_t srvid;
6144
6145         srvid = getpid();
6146
6147         /* register a message port and listen for messages
6148         */
6149         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6150         printf("Listening for messages on srvid:%d\n", (int)srvid);
6151
6152         while (1) {
6153                 tevent_loop_once(ctdb->ev);
6154         }
6155
6156         return 0;
6157 }
6158
6159 /*
6160   list all nodes in the cluster
6161   we parse the nodes file directly
6162  */
6163 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6164 {
6165         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6166         struct ctdb_node_map *node_map;
6167         int i;
6168
6169         assert_single_node_only();
6170
6171         node_map = read_nodes_file(mem_ctx);
6172         if (node_map == NULL) {
6173                 talloc_free(mem_ctx);
6174                 return -1;
6175         }
6176
6177         for (i = 0; i < node_map->num; i++) {
6178                 const char *addr;
6179
6180                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
6181                         continue;
6182                 }
6183                 addr = ctdb_addr_to_str(&node_map->nodes[i].addr);
6184                 if (options.machinereadable){
6185                         printm(":%d:%s:\n", node_map->nodes[i].pnn, addr);
6186                 } else {
6187                         printf("%s\n", addr);
6188                 }
6189         }
6190         talloc_free(mem_ctx);
6191
6192         return 0;
6193 }
6194
6195 /**********************************************************************/
6196 /* reload the nodes file on all nodes */
6197
6198 static void get_nodes_files_callback(struct ctdb_context *ctdb,
6199                                      uint32_t node_pnn, int32_t res,
6200                                      TDB_DATA outdata, void *callback_data)
6201 {
6202         struct ctdb_node_map **maps =
6203                 talloc_get_type(callback_data, struct ctdb_node_map *);
6204
6205         if (outdata.dsize < offsetof(struct ctdb_node_map, nodes) ||
6206             outdata.dptr == NULL) {
6207                 DEBUG(DEBUG_ERR,
6208                       (__location__ " Invalid return data: %u %p\n",
6209                        (unsigned)outdata.dsize, outdata.dptr));
6210                 return;
6211         }
6212
6213         if (node_pnn >= talloc_array_length(maps)) {
6214                 DEBUG(DEBUG_ERR,
6215                       (__location__ " unexpected PNN %u\n", node_pnn));
6216                 return;
6217         }
6218
6219         maps[node_pnn] = talloc_memdup(maps, outdata.dptr, outdata.dsize);
6220 }
6221
6222 static void get_nodes_files_fail_callback(struct ctdb_context *ctdb,
6223                                           uint32_t node_pnn, int32_t res,
6224                                           TDB_DATA outdata, void *callback_data)
6225 {
6226         DEBUG(DEBUG_ERR,
6227               ("ERROR: Failed to get nodes file from node %u\n", node_pnn));
6228 }
6229
6230 static struct ctdb_node_map **
6231 ctdb_get_nodes_files(struct ctdb_context *ctdb,
6232                      TALLOC_CTX *mem_ctx,
6233                      struct timeval timeout,
6234                      struct ctdb_node_map *nodemap)
6235 {
6236         uint32_t *nodes;
6237         int ret;
6238         struct ctdb_node_map **maps;
6239
6240         maps = talloc_zero_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
6241         CTDB_NO_MEMORY_NULL(ctdb, maps);
6242
6243         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
6244
6245         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODES_FILE,
6246                                         nodes, 0, TIMELIMIT(),
6247                                         true, tdb_null,
6248                                         get_nodes_files_callback,
6249                                         get_nodes_files_fail_callback,
6250                                         maps);
6251         if (ret != 0) {
6252                 talloc_free(maps);
6253                 return NULL;
6254         }
6255
6256         return maps;
6257 }
6258
6259 static bool node_files_are_identical(struct ctdb_node_map *nm1,
6260                                      struct ctdb_node_map *nm2)
6261 {
6262         int i;
6263
6264         if (nm1->num != nm2->num) {
6265                 return false;
6266         }
6267         for (i = 0; i < nm1->num; i++) {
6268                 if (memcmp(&nm1->nodes[i], &nm2->nodes[i],
6269                            sizeof(struct ctdb_node_and_flags)) != 0) {
6270                         return false;
6271                 }
6272         }
6273
6274         return true;
6275 }
6276
6277 static bool check_all_node_files_are_identical(struct ctdb_context *ctdb,
6278                                                TALLOC_CTX *mem_ctx,
6279                                                struct timeval timeout,
6280                                                struct ctdb_node_map *nodemap,
6281                                                struct ctdb_node_map *file_nodemap)
6282 {
6283         static struct ctdb_node_map **maps;
6284         int i;
6285         bool ret = true;
6286
6287         maps = ctdb_get_nodes_files(ctdb, mem_ctx, timeout, nodemap);
6288         if (maps == NULL) {
6289                 return false;
6290         }
6291
6292         for (i = 0; i < talloc_array_length(maps); i++) {
6293                 if (maps[i] == NULL) {
6294                         continue;
6295                 }
6296                 if (!node_files_are_identical(file_nodemap, maps[i])) {
6297                         DEBUG(DEBUG_ERR,
6298                               ("ERROR: Node file on node %u differs from current node (%u)\n",
6299                                i, ctdb_get_pnn(ctdb)));
6300                         ret = false;
6301                 }
6302         }
6303
6304         return ret;
6305 }
6306
6307 /*
6308   reload the nodes file on the local node
6309  */
6310 static bool sanity_check_nodes_file_changes(TALLOC_CTX *mem_ctx,
6311                                             struct ctdb_node_map *nodemap,
6312                                             struct ctdb_node_map *file_nodemap)
6313 {
6314         int i;
6315         bool should_abort = false;
6316         bool have_changes = false;
6317
6318         for (i=0; i<nodemap->num; i++) {
6319                 if (i >= file_nodemap->num) {
6320                         DEBUG(DEBUG_ERR,
6321                               ("ERROR: Node %u (%s) missing from nodes file\n",
6322                                nodemap->nodes[i].pnn,
6323                                ctdb_addr_to_str(&nodemap->nodes[i].addr)));
6324                         should_abort = true;
6325                         continue;
6326                 }
6327                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED &&
6328                     file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6329                         /* Node remains deleted */
6330                         DEBUG(DEBUG_INFO,
6331                               ("Node %u is unchanged (DELETED)\n",
6332                                nodemap->nodes[i].pnn));
6333                 } else if (!(nodemap->nodes[i].flags & NODE_FLAGS_DELETED) &&
6334                            !(file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED)) {
6335                         /* Node not newly nor previously deleted */
6336                         if (!ctdb_same_ip(&nodemap->nodes[i].addr,
6337                                           &file_nodemap->nodes[i].addr)) {
6338                                 DEBUG(DEBUG_ERR,
6339                                       ("ERROR: Node %u has changed IP address (was %s, now %s)\n",
6340                                        nodemap->nodes[i].pnn,
6341                                        /* ctdb_addr_to_str() returns a static */
6342                                        talloc_strdup(mem_ctx,
6343                                                      ctdb_addr_to_str(&nodemap->nodes[i].addr)),
6344                                        ctdb_addr_to_str(&file_nodemap->nodes[i].addr)));
6345                                 should_abort = true;
6346                         } else {
6347                                 DEBUG(DEBUG_INFO,
6348                                       ("Node %u is unchanged\n",
6349                                        nodemap->nodes[i].pnn));
6350                                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
6351                                         DEBUG(DEBUG_WARNING,
6352                                               ("WARNING: Node %u is disconnected. You MUST fix this node manually!\n",
6353                                                nodemap->nodes[i].pnn));
6354                                 }
6355                         }
6356                 } else if (file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6357                         /* Node is being deleted */
6358                         DEBUG(DEBUG_NOTICE,
6359                               ("Node %u is DELETED\n",
6360                                nodemap->nodes[i].pnn));
6361                         have_changes = true;
6362                         if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
6363                                 DEBUG(DEBUG_ERR,
6364                                       ("ERROR: Node %u is still connected\n",
6365                                        nodemap->nodes[i].pnn));
6366                                 should_abort = true;
6367                         }
6368                 } else if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6369                         /* Node was previously deleted */
6370                         DEBUG(DEBUG_NOTICE,
6371                               ("Node %u is UNDELETED\n", nodemap->nodes[i].pnn));
6372                         have_changes = true;
6373                 }
6374         }
6375
6376         if (should_abort) {
6377                 DEBUG(DEBUG_ERR,
6378                       ("ERROR: Nodes will not be reloaded due to previous error\n"));
6379                 talloc_free(mem_ctx);
6380                 exit(1);
6381         }
6382
6383         /* Leftover nodes in file are NEW */
6384         for (; i < file_nodemap->num; i++) {
6385                 DEBUG(DEBUG_NOTICE, ("Node %u is NEW\n",
6386                                      file_nodemap->nodes[i].pnn));
6387                 have_changes = true;
6388         }
6389
6390         return have_changes;
6391 }
6392
6393 static void reload_nodes_fail_callback(struct ctdb_context *ctdb,
6394                                        uint32_t node_pnn, int32_t res,
6395                                        TDB_DATA outdata, void *callback_data)
6396 {
6397         DEBUG(DEBUG_WARNING,
6398               ("WARNING: Node %u failed to reload nodes. You MUST fix this node manually!\n",
6399                node_pnn));
6400 }
6401
6402 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6403 {
6404         int i, ret;
6405         struct ctdb_node_map *nodemap=NULL;
6406         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
6407         struct ctdb_node_map *file_nodemap;
6408         uint32_t *conn;
6409         uint32_t timeout;
6410
6411         assert_current_node_only(ctdb);
6412
6413         /* Load both the current nodemap and the contents of the local
6414          * nodes file.  Compare and sanity check them before doing
6415          * anything. */
6416
6417         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6418         if (ret != 0) {
6419                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6420                 return ret;
6421         }
6422
6423         file_nodemap = read_nodes_file(tmp_ctx);
6424         if (file_nodemap == NULL) {
6425                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
6426                 talloc_free(tmp_ctx);
6427                 return -1;
6428         }
6429
6430         if (!check_all_node_files_are_identical(ctdb, tmp_ctx, TIMELIMIT(),
6431                                                 nodemap, file_nodemap)) {
6432                 return -1;
6433         }
6434
6435         if (!sanity_check_nodes_file_changes(tmp_ctx, nodemap, file_nodemap)) {
6436                 DEBUG(DEBUG_NOTICE,
6437                       ("No change in nodes file, skipping unnecessary reload\n"));
6438                 talloc_free(tmp_ctx);
6439                 return 0;
6440         }
6441
6442         /* Now make the changes */
6443         conn = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
6444         for (i = 0; i < talloc_array_length(conn); i++) {
6445                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n",
6446                                      conn[i]));
6447         }
6448
6449         /* Another timeout could be used, such as ReRecoveryTimeout or
6450          * a new one for this purpose.  However, this is the simplest
6451          * option. */
6452         timeout = options.timelimit;
6453         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_RECOVERIES, &timeout,
6454                         "Disable recoveries", true);
6455
6456
6457         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_NODES_FILE,
6458                                         conn, 0, TIMELIMIT(),
6459                                         true, tdb_null,
6460                                         NULL, reload_nodes_fail_callback,
6461                                         NULL);
6462
6463         timeout = 0;
6464         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_RECOVERIES, &timeout,
6465                         "Enable recoveries", true);
6466
6467         talloc_free(tmp_ctx);
6468
6469         return 0;
6470 }
6471
6472
6473 static const struct {
6474         const char *name;
6475         int (*fn)(struct ctdb_context *, int, const char **);
6476         bool auto_all;
6477         bool without_daemon; /* can be run without daemon running ? */
6478         const char *msg;
6479         const char *args;
6480 } ctdb_commands[] = {
6481         { "version",         control_version,           true,   true,   "show version of ctdb" },
6482         { "status",          control_status,            true,   false,  "show node status" },
6483         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6484         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6485         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6486         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6487         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6488         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6489         { "statistics",      control_statistics,        false,  false, "show statistics" },
6490         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6491         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6492         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6493         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6494         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6495         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6496         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6497         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6498         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6499         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6500         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6501         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6502         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6503         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6504         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6505         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6506         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6507         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6508         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6509         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6510         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6511         { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
6512         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6513         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6514         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6515         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6516         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6517         { "stop",            control_stop,              true,   false,  "stop a node" },
6518         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6519         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6520         { "unban",           control_unban,             true,   false,  "unban a node" },
6521         { "showban",         control_showban,           true,   false,  "show ban information"},
6522         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6523         { "recover",         control_recover,           true,   false,  "force recovery" },
6524         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6525         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6526         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6527         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6528         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6529         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6530         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6531         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6532         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6533
6534         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6535
6536         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6537         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6538         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6539         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6540         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6541         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6542         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6543         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6544         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6545         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6546         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6547         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6548         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6549         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6550         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6551         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6552         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6553         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6554         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6555         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6556         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6557         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6558         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6559         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6560         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6561         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6562         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6563         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6564         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6565         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6566         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6567         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6568         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6569         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6570         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6571         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6572         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6573         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6574         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6575         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
6576         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
6577         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6578         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6579         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6580         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6581         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6582         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6583         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6584 };
6585
6586 /*
6587   show usage message
6588  */
6589 static void usage(void)
6590 {
6591         int i;
6592         printf(
6593 "Usage: ctdb [options] <control>\n" \
6594 "Options:\n" \
6595 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6596 "   -Y                 generate machine readable output\n"
6597 "   -x <char>          specify delimiter for machine readable output\n"
6598 "   -v                 generate verbose output\n"
6599 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6600         printf("Controls:\n");
6601         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6602                 printf("  %-15s %-27s  %s\n", 
6603                        ctdb_commands[i].name, 
6604                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6605                        ctdb_commands[i].msg);
6606         }
6607         exit(1);
6608 }
6609
6610
6611 static void ctdb_alarm(int sig)
6612 {
6613         printf("Maximum runtime exceeded - exiting\n");
6614         _exit(ERR_TIMEOUT);
6615 }
6616
6617 /*
6618   main program
6619 */
6620 int main(int argc, const char *argv[])
6621 {
6622         struct ctdb_context *ctdb;
6623         char *nodestring = NULL;
6624         int machineparsable = 0;
6625         struct poptOption popt_options[] = {
6626                 POPT_AUTOHELP
6627                 POPT_CTDB_CMDLINE
6628                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6629                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6630                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
6631                 { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
6632                 { NULL, 'X', POPT_ARG_NONE, &machineparsable, 0, "enable machine parsable output with separator |", NULL },
6633                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6634                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6635                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6636                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6637                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6638                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6639                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6640                 POPT_TABLEEND
6641         };
6642         int opt;
6643         const char **extra_argv;
6644         int extra_argc = 0;
6645         int ret=-1, i;
6646         poptContext pc;
6647         struct tevent_context *ev;
6648         const char *control;
6649
6650         setlinebuf(stdout);
6651         
6652         /* set some defaults */
6653         options.maxruntime = 0;
6654         options.timelimit = 10;
6655         options.pnn = CTDB_CURRENT_NODE;
6656
6657         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6658
6659         while ((opt = poptGetNextOpt(pc)) != -1) {
6660                 switch (opt) {
6661                 default:
6662                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6663                                 poptBadOption(pc, 0), poptStrerror(opt)));
6664                         exit(1);
6665                 }
6666         }
6667
6668         /* setup the remaining options for the main program to use */
6669         extra_argv = poptGetArgs(pc);
6670         if (extra_argv) {
6671                 extra_argv++;
6672                 while (extra_argv[extra_argc]) extra_argc++;
6673         }
6674
6675         if (extra_argc < 1) {
6676                 usage();
6677         }
6678
6679         if (options.maxruntime == 0) {
6680                 const char *ctdb_timeout;
6681                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6682                 if (ctdb_timeout != NULL) {
6683                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6684                 } else {
6685                         /* default timeout is 120 seconds */
6686                         options.maxruntime = 120;
6687                 }
6688         }
6689
6690         if (machineparsable) {
6691                 options.machineseparator = "|";
6692         }
6693         if (options.machineseparator != NULL) {
6694                 if (strlen(options.machineseparator) != 1) {
6695                         printf("Invalid separator \"%s\" - "
6696                                "must be single character\n",
6697                                options.machineseparator);
6698                         exit(1);
6699                 }
6700
6701                 /* -x implies -Y */
6702                 options.machinereadable = true;
6703         } else if (options.machinereadable) {
6704                 options.machineseparator = ":";
6705         }
6706
6707         signal(SIGALRM, ctdb_alarm);
6708         alarm(options.maxruntime);
6709
6710         control = extra_argv[0];
6711
6712         /* Default value for CTDB_BASE - don't override */
6713         setenv("CTDB_BASE", CTDB_ETCDIR, 0);
6714
6715         ev = tevent_context_init(NULL);
6716         if (!ev) {
6717                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6718                 exit(1);
6719         }
6720
6721         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6722                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6723                         break;
6724                 }
6725         }
6726
6727         if (i == ARRAY_SIZE(ctdb_commands)) {
6728                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6729                 exit(1);
6730         }
6731
6732         if (ctdb_commands[i].without_daemon == true) {
6733                 if (nodestring != NULL) {
6734                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6735                         exit(1);
6736                 }
6737                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6738         }
6739
6740         /* initialise ctdb */
6741         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6742
6743         if (ctdb == NULL) {
6744                 uint32_t pnn;
6745                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6746
6747                 pnn = find_node_xpnn();
6748                 if (pnn == -1) {
6749                         DEBUG(DEBUG_ERR,
6750                               ("Is this node part of a CTDB cluster?\n"));
6751                 }
6752                 exit(1);
6753         }
6754
6755         /* setup the node number(s) to contact */
6756         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6757                               &options.nodes, &options.pnn)) {
6758                 usage();
6759         }
6760
6761         if (options.pnn == CTDB_CURRENT_NODE) {
6762                 options.pnn = options.nodes[0];
6763         }
6764
6765         if (ctdb_commands[i].auto_all && 
6766             ((options.pnn == CTDB_BROADCAST_ALL) ||
6767              (options.pnn == CTDB_MULTICAST))) {
6768                 int j;
6769
6770                 ret = 0;
6771                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6772                         options.pnn = options.nodes[j];
6773                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6774                 }
6775         } else {
6776                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6777         }
6778
6779         talloc_free(ctdb);
6780         talloc_free(ev);
6781         (void)poptFreeContext(pc);
6782
6783         return ret;
6784
6785 }