ctdb-tools: Skip GET_PUBLIC_IP_INFO for unassigned addresses
[amitay/samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36
37 #include "common/db_hash.h"
38 #include "common/logging.h"
39 #include "protocol/protocol.h"
40 #include "protocol/protocol_api.h"
41 #include "common/system.h"
42 #include "client/client.h"
43
44 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
45
46 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
47 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
48
49 static struct {
50         const char *socket;
51         const char *debuglevelstr;
52         int timelimit;
53         int pnn;
54         int machinereadable;
55         const char *sep;
56         int machineparsable;
57         int verbose;
58         int maxruntime;
59         int printemptyrecords;
60         int printdatasize;
61         int printlmaster;
62         int printhash;
63         int printrecordflags;
64 } options;
65
66 static poptContext pc;
67
68 struct ctdb_context {
69         struct tevent_context *ev;
70         struct ctdb_client_context *client;
71         struct ctdb_node_map *nodemap;
72         uint32_t pnn, cmd_pnn;
73         uint64_t srvid;
74 };
75
76 static void usage(const char *command);
77
78 /*
79  * Utility Functions
80  */
81
82 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
83 {
84         return (tv2->tv_sec - tv->tv_sec) +
85                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
86 }
87
88 static struct ctdb_node_and_flags *get_node_by_pnn(
89                                         struct ctdb_node_map *nodemap,
90                                         uint32_t pnn)
91 {
92         int i;
93
94         for (i=0; i<nodemap->num; i++) {
95                 if (nodemap->node[i].pnn == pnn) {
96                         return &nodemap->node[i];
97                 }
98         }
99         return NULL;
100 }
101
102 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
103 {
104         static const struct {
105                 uint32_t flag;
106                 const char *name;
107         } flag_names[] = {
108                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
109                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
110                 { NODE_FLAGS_BANNED,                "BANNED" },
111                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
112                 { NODE_FLAGS_DELETED,               "DELETED" },
113                 { NODE_FLAGS_STOPPED,               "STOPPED" },
114                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
115         };
116         char *flags_str = NULL;
117         int i;
118
119         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
120                 if (flags & flag_names[i].flag) {
121                         if (flags_str == NULL) {
122                                 flags_str = talloc_asprintf(mem_ctx,
123                                                 "%s", flag_names[i].name);
124                         } else {
125                                 flags_str = talloc_asprintf_append(flags_str,
126                                                 "|%s", flag_names[i].name);
127                         }
128                         if (flags_str == NULL) {
129                                 return "OUT-OF-MEMORY";
130                         }
131                 }
132         }
133         if (flags_str == NULL) {
134                 return "OK";
135         }
136
137         return flags_str;
138 }
139
140 static uint64_t next_srvid(struct ctdb_context *ctdb)
141 {
142         ctdb->srvid += 1;
143         return ctdb->srvid;
144 }
145
146 /*
147  * Get consistent nodemap information.
148  *
149  * If nodemap is already cached, use that. If not get it.
150  * If the current node is BANNED, then get nodemap from "better" node.
151  */
152 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
153 {
154         TALLOC_CTX *tmp_ctx;
155         struct ctdb_node_map *nodemap;
156         struct ctdb_node_and_flags *node;
157         uint32_t current_node;
158         int ret;
159
160         if (force) {
161                 TALLOC_FREE(ctdb->nodemap);
162         }
163
164         if (ctdb->nodemap != NULL) {
165                 return ctdb->nodemap;
166         }
167
168         tmp_ctx = talloc_new(ctdb);
169         if (tmp_ctx == NULL) {
170                 return false;
171         }
172
173         current_node = ctdb->pnn;
174 again:
175         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
176                                     current_node, TIMEOUT(), &nodemap);
177         if (ret != 0) {
178                 fprintf(stderr, "Failed to get nodemap from node %u\n",
179                         current_node);
180                 goto failed;
181         }
182
183         node = get_node_by_pnn(nodemap, current_node);
184         if (node->flags & NODE_FLAGS_BANNED) {
185                 /* Pick next node */
186                 do {
187                         current_node = (current_node + 1) % nodemap->num;
188                         node = get_node_by_pnn(nodemap, current_node);
189                         if (! (node->flags &
190                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
191                                 break;
192                         }
193                 } while (current_node != ctdb->pnn);
194
195                 if (current_node == ctdb->pnn) {
196                         /* Tried all nodes in the cluster */
197                         fprintf(stderr, "Warning: All nodes are banned.\n");
198                         goto failed;
199                 }
200
201                 goto again;
202         }
203
204         ctdb->nodemap = talloc_steal(ctdb, nodemap);
205         return nodemap;
206
207 failed:
208         talloc_free(tmp_ctx);
209         return NULL;
210 }
211
212 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
213 {
214         struct ctdb_node_map *nodemap;
215         bool found;
216         int i;
217
218         if (pnn == -1) {
219                 return false;
220         }
221
222         nodemap = get_nodemap(ctdb, false);
223         if (nodemap == NULL) {
224                 return false;
225         }
226
227         found = false;
228         for (i=0; i<nodemap->num; i++) {
229                 if (nodemap->node[i].pnn == pnn) {
230                         found = true;
231                         break;
232                 }
233         }
234         if (! found) {
235                 fprintf(stderr, "Node %u does not exist\n", pnn);
236                 return false;
237         }
238
239         if (nodemap->node[i].flags &
240             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
241                 fprintf(stderr, "Node %u has status %s\n", pnn,
242                         pretty_print_flags(ctdb, nodemap->node[i].flags));
243                 return false;
244         }
245
246         return true;
247 }
248
249 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
250                                             struct ctdb_node_map *nodemap)
251 {
252         struct ctdb_node_map *nodemap2;
253
254         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
255         if (nodemap2 == NULL) {
256                 return NULL;
257         }
258
259         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
260                                       nodemap->num);
261         if (nodemap2->node == NULL) {
262                 talloc_free(nodemap2);
263                 return NULL;
264         }
265
266         return nodemap2;
267 }
268
269 /*
270  * Get the number and the list of matching nodes
271  *
272  *   nodestring :=  NULL | all | pnn,[pnn,...]
273  *
274  * If nodestring is NULL, use the current node.
275  */
276 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
277                              const char *nodestring,
278                              struct ctdb_node_map **out)
279 {
280         struct ctdb_node_map *nodemap, *nodemap2;
281         struct ctdb_node_and_flags *node;
282         int i;
283
284         nodemap = get_nodemap(ctdb, false);
285         if (nodemap == NULL) {
286                 return false;
287         }
288
289         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
290         if (nodemap2 == NULL) {
291                 return false;
292         }
293
294         if (nodestring == NULL) {
295                 for (i=0; i<nodemap->num; i++) {
296                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
297                                 nodemap2->node[0] = nodemap->node[i];
298                                 break;
299                         }
300                 }
301                 nodemap2->num = 1;
302
303                 goto done;
304         }
305
306         if (strcmp(nodestring, "all") == 0) {
307                 for (i=0; i<nodemap->num; i++) {
308                         nodemap2->node[i] = nodemap->node[i];
309                 }
310                 nodemap2->num = nodemap->num;
311
312                 goto done;
313         } else {
314                 char *ns, *tok;
315
316                 ns = talloc_strdup(mem_ctx, nodestring);
317                 if (ns == NULL) {
318                         return false;
319                 }
320
321                 tok = strtok(ns, ",");
322                 while (tok != NULL) {
323                         uint32_t pnn;
324                         char *endptr;
325
326                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
327                         if (pnn == 0 && tok == endptr) {
328                                 fprintf(stderr, "Invalid node %s\n", tok);
329                                         return false;
330                         }
331
332                         node = get_node_by_pnn(nodemap, pnn);
333                         if (node == NULL) {
334                                 fprintf(stderr, "Node %u does not exist\n",
335                                         pnn);
336                                 return false;
337                         }
338
339                         nodemap2->node[nodemap2->num] = *node;
340                         nodemap2->num += 1;
341
342                         tok = strtok(NULL, ",");
343                 }
344         }
345
346 done:
347         *out = nodemap2;
348         return true;
349 }
350
351 /* Compare IP address */
352 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
353 {
354         bool ret = false;
355
356         if (ip1->sa.sa_family != ip2->sa.sa_family) {
357                 return false;
358         }
359
360         switch (ip1->sa.sa_family) {
361         case AF_INET:
362                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
363                               sizeof(struct in_addr)) == 0);
364                 break;
365
366         case AF_INET6:
367                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
368                               sizeof(struct in6_addr)) == 0);
369                 break;
370         }
371
372         return ret;
373 }
374
375 /* Append a node to a node map with given address and flags */
376 static bool node_map_add(struct ctdb_node_map *nodemap,
377                          const char *nstr, uint32_t flags)
378 {
379         ctdb_sock_addr addr;
380         uint32_t num;
381         struct ctdb_node_and_flags *n;
382
383         if (! parse_ip(nstr, NULL, 0, &addr)) {
384                 fprintf(stderr, "Invalid IP address %s\n", nstr);
385                 return false;
386         }
387
388         num = nodemap->num;
389         nodemap->node = talloc_realloc(nodemap, nodemap->node,
390                                        struct ctdb_node_and_flags, num+1);
391         if (nodemap->node == NULL) {
392                 return false;
393         }
394
395         n = &nodemap->node[num];
396         n->addr = addr;
397         n->pnn = num;
398         n->flags = flags;
399
400         nodemap->num = num+1;
401         return true;
402 }
403
404 /* Read a nodes file into a node map */
405 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
406                                                   const char *nlist)
407 {
408         char **lines;
409         int nlines;
410         int i;
411         struct ctdb_node_map *nodemap;
412
413         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
414         if (nodemap == NULL) {
415                 return NULL;
416         }
417
418         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
419         if (lines == NULL) {
420                 return NULL;
421         }
422
423         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
424                 nlines--;
425         }
426
427         for (i=0; i<nlines; i++) {
428                 char *node;
429                 uint32_t flags;
430                 size_t len;
431
432                 node = lines[i];
433                 /* strip leading spaces */
434                 while((*node == ' ') || (*node == '\t')) {
435                         node++;
436                 }
437
438                 len = strlen(node);
439
440                 /* strip trailing spaces */
441                 while ((len > 1) &&
442                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
443                 {
444                         node[len-1] = '\0';
445                         len--;
446                 }
447
448                 if (len == 0) {
449                         continue;
450                 }
451                 if (*node == '#') {
452                         /* A "deleted" node is a node that is
453                            commented out in the nodes file.  This is
454                            used instead of removing a line, which
455                            would cause subsequent nodes to change
456                            their PNN. */
457                         flags = NODE_FLAGS_DELETED;
458                         node = discard_const("0.0.0.0");
459                 } else {
460                         flags = 0;
461                 }
462                 if (! node_map_add(nodemap, node, flags)) {
463                         talloc_free(lines);
464                         TALLOC_FREE(nodemap);
465                         return NULL;
466                 }
467         }
468
469         talloc_free(lines);
470         return nodemap;
471 }
472
473 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
474 {
475         struct ctdb_node_map *nodemap;
476         char *nodepath;
477         const char *nodes_list = NULL;
478
479         if (pnn != CTDB_UNKNOWN_PNN) {
480                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
481                 if (nodepath != NULL) {
482                         nodes_list = getenv(nodepath);
483                 }
484         }
485         if (nodes_list == NULL) {
486                 nodes_list = getenv("CTDB_NODES");
487         }
488         if (nodes_list == NULL) {
489                 const char *basedir = getenv("CTDB_BASE");
490                 if (basedir == NULL) {
491                         basedir = CTDB_ETCDIR;
492                 }
493                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
494                 if (nodes_list == NULL) {
495                         fprintf(stderr, "Memory allocation error\n");
496                         return NULL;
497                 }
498         }
499
500         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
501         if (nodemap == NULL) {
502                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
503                         nodes_list);
504                 return NULL;
505         }
506
507         return nodemap;
508 }
509
510 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
511                                  struct ctdb_context *ctdb,
512                                  struct ctdb_dbid_map *dbmap,
513                                  const char *db_name)
514 {
515         struct ctdb_dbid *db = NULL;
516         const char *name;
517         int ret, i;
518
519         for (i=0; i<dbmap->num; i++) {
520                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
521                                            ctdb->pnn, TIMEOUT(),
522                                            dbmap->dbs[i].db_id, &name);
523                 if (ret != 0) {
524                         return false;
525                 }
526
527                 if (strcmp(db_name, name) == 0) {
528                         talloc_free(discard_const(name));
529                         db = &dbmap->dbs[i];
530                         break;
531                 }
532         }
533
534         return db;
535 }
536
537 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
538                       const char *db_arg, uint32_t *db_id,
539                       const char **db_name, uint8_t *db_flags)
540 {
541         struct ctdb_dbid_map *dbmap;
542         struct ctdb_dbid *db = NULL;
543         uint32_t id = 0;
544         const char *name = NULL;
545         int ret, i;
546
547         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
548                                   ctdb->pnn, TIMEOUT(), &dbmap);
549         if (ret != 0) {
550                 return false;
551         }
552
553         if (strncmp(db_arg, "0x", 2) == 0) {
554                 id = strtoul(db_arg, NULL, 0);
555                 for (i=0; i<dbmap->num; i++) {
556                         if (id == dbmap->dbs[i].db_id) {
557                                 db = &dbmap->dbs[i];
558                                 break;
559                         }
560                 }
561         } else {
562                 name = db_arg;
563                 db = db_find(mem_ctx, ctdb, dbmap, name);
564         }
565
566         if (db == NULL) {
567                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
568                 return false;
569         }
570
571         if (name == NULL) {
572                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
573                                            ctdb->pnn, TIMEOUT(), id, &name);
574                 if (ret != 0) {
575                         return false;
576                 }
577         }
578
579         if (db_id != NULL) {
580                 *db_id = db->db_id;
581         }
582         if (db_name != NULL) {
583                 *db_name = talloc_strdup(mem_ctx, name);
584         }
585         if (db_flags != NULL) {
586                 *db_flags = db->flags;
587         }
588         return true;
589 }
590
591 static int h2i(char h)
592 {
593         if (h >= 'a' && h <= 'f') {
594                 return h - 'a' + 10;
595         }
596         if (h >= 'A' && h <= 'F') {
597                 return h - 'f' + 10;
598         }
599         return h - '0';
600 }
601
602 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
603                        TDB_DATA *out)
604 {
605         int i;
606         TDB_DATA data;
607
608         if (len & 0x01) {
609                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
610                         str);
611                 return EINVAL;
612         }
613
614         data.dsize = len / 2;
615         data.dptr = talloc_size(mem_ctx, data.dsize);
616         if (data.dptr == NULL) {
617                 return ENOMEM;
618         }
619
620         for (i=0; i<data.dsize; i++) {
621                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
622         }
623
624         *out = data;
625         return 0;
626 }
627
628 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
629                        TDB_DATA *out)
630 {
631         TDB_DATA data;
632         int ret = 0;
633
634         if (strncmp(str, "0x", 2) == 0) {
635                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
636         } else {
637                 data.dptr = talloc_memdup(mem_ctx, str, len);
638                 if (data.dptr == NULL) {
639                         return ENOMEM;
640                 }
641                 data.dsize = len;
642         }
643
644         *out = data;
645         return ret;
646 }
647
648 static int run_helper(const char *command, const char *path, const char *arg1)
649 {
650         pid_t pid;
651         int save_errno, status, ret;
652
653         pid = fork();
654         if (pid < 0) {
655                 save_errno = errno;
656                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
657                         command, path, strerror(save_errno));
658                 return save_errno;
659         }
660
661         if (pid == 0) {
662                 ret = execl(path, path, arg1, NULL);
663                 if (ret == -1) {
664                         _exit(errno);
665                 }
666                 /* Should not happen */
667                 _exit(ENOEXEC);
668         }
669
670         ret = waitpid(pid, &status, 0);
671         if (ret == -1) {
672                 save_errno = errno;
673                 fprintf(stderr, "waitpid() failed for %s - %s\n",
674                         command, strerror(save_errno));
675                 return save_errno;
676         }
677
678         if (WIFEXITED(status)) {
679                 ret = WEXITSTATUS(status);
680                 return ret;
681         }
682
683         if (WIFSIGNALED(status)) {
684                 fprintf(stderr, "%s terminated with signal %d\n",
685                         command, WTERMSIG(status));
686                 return EINTR;
687         }
688
689         return 0;
690 }
691
692 /*
693  * Command Functions
694  */
695
696 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
697                            int argc, const char **argv)
698 {
699         printf("%s\n", CTDB_VERSION_STRING);
700         return 0;
701 }
702
703 static bool partially_online(TALLOC_CTX *mem_ctx,
704                              struct ctdb_context *ctdb,
705                              struct ctdb_node_and_flags *node)
706 {
707         struct ctdb_iface_list *iface_list;
708         int ret, i;
709         bool status = false;
710
711         if (node->flags != 0) {
712                 return false;
713         }
714
715         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
716                                    node->pnn, TIMEOUT(), &iface_list);
717         if (ret != 0) {
718                 return false;
719         }
720
721         status = false;
722         for (i=0; i < iface_list->num; i++) {
723                 if (iface_list->iface[i].link_state == 0) {
724                         status = true;
725                         break;
726                 }
727         }
728
729         return status;
730 }
731
732 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
733                                   struct ctdb_context *ctdb,
734                                   struct ctdb_node_map *nodemap,
735                                   uint32_t mypnn)
736 {
737         struct ctdb_node_and_flags *node;
738         int i;
739
740         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
741                options.sep,
742                "Node", options.sep,
743                "IP", options.sep,
744                "Disconnected", options.sep,
745                "Banned", options.sep,
746                "Disabled", options.sep,
747                "Unhealthy", options.sep,
748                "Stopped", options.sep,
749                "Inactive", options.sep,
750                "PartiallyOnline", options.sep,
751                "ThisNode", options.sep);
752
753         for (i=0; i<nodemap->num; i++) {
754                 node = &nodemap->node[i];
755                 if (node->flags & NODE_FLAGS_DELETED) {
756                         continue;
757                 }
758
759                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
760                        options.sep,
761                        node->pnn, options.sep,
762                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
763                        options.sep,
764                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
765                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
766                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
767                        options.sep,
768                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
769                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
770                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
771                        partially_online(mem_ctx, ctdb, node), options.sep,
772                        (node->pnn == mypnn)?'Y':'N', options.sep);
773         }
774
775 }
776
777 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
778                           struct ctdb_node_map *nodemap, uint32_t mypnn)
779 {
780         struct ctdb_node_and_flags *node;
781         int num_deleted_nodes = 0;
782         int i;
783
784         for (i=0; i<nodemap->num; i++) {
785                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
786                         num_deleted_nodes++;
787                 }
788         }
789
790         if (num_deleted_nodes == 0) {
791                 printf("Number of nodes:%d\n", nodemap->num);
792         } else {
793                 printf("Number of nodes:%d (including %d deleted nodes)\n",
794                        nodemap->num, num_deleted_nodes);
795         }
796
797         for (i=0; i<nodemap->num; i++) {
798                 node = &nodemap->node[i];
799                 if (node->flags & NODE_FLAGS_DELETED) {
800                         continue;
801                 }
802
803                 printf("pnn:%u %-16s %s%s\n",
804                        node->pnn,
805                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
806                        partially_online(mem_ctx, ctdb, node) ?
807                                 "PARTIALLYONLINE" :
808                                 pretty_print_flags(mem_ctx, node->flags),
809                        node->pnn == mypnn ? " (THIS NODE)" : "");
810         }
811 }
812
813 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
814                          struct ctdb_node_map *nodemap, uint32_t mypnn,
815                          struct ctdb_vnn_map *vnnmap, int recmode,
816                          uint32_t recmaster)
817 {
818         int i;
819
820         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
821
822         if (vnnmap->generation == INVALID_GENERATION) {
823                 printf("Generation:INVALID\n");
824         } else {
825                 printf("Generation:%u\n", vnnmap->generation);
826         }
827         printf("Size:%d\n", vnnmap->size);
828         for (i=0; i<vnnmap->size; i++) {
829                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
830         }
831
832         printf("Recovery mode:%s (%d)\n",
833                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
834                recmode);
835         printf("Recovery master:%d\n", recmaster);
836 }
837
838 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
839                           int argc, const char **argv)
840 {
841         struct ctdb_node_map *nodemap;
842         struct ctdb_vnn_map *vnnmap;
843         int recmode;
844         uint32_t recmaster;
845         int ret;
846
847         if (argc != 0) {
848                 usage("status");
849         }
850
851         nodemap = get_nodemap(ctdb, false);
852         if (nodemap == NULL) {
853                 return 1;
854         }
855
856         if (options.machinereadable == 1) {
857                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
858                 return 0;
859         }
860
861         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
862                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
863         if (ret != 0) {
864                 return ret;
865         }
866
867         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
868                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
869         if (ret != 0) {
870                 return ret;
871         }
872
873         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
874                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
875         if (ret != 0) {
876                 return ret;
877         }
878
879         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
880                      recmode, recmaster);
881         return 0;
882 }
883
884 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
885                           int argc, const char **argv)
886 {
887         struct ctdb_uptime *uptime;
888         int ret, tmp, days, hours, minutes, seconds;
889
890         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
891                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
892         if (ret != 0) {
893                 return ret;
894         }
895
896         printf("Current time of node %-4u     :                %s",
897                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
898
899         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
900         seconds = tmp % 60; tmp /= 60;
901         minutes = tmp % 60; tmp /= 60;
902         hours = tmp % 24; tmp /= 24;
903         days = tmp;
904
905         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
906                days, hours, minutes, seconds,
907                ctime(&uptime->ctdbd_start_time.tv_sec));
908
909         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
910         seconds = tmp % 60; tmp /= 60;
911         minutes = tmp % 60; tmp /= 60;
912         hours = tmp % 24; tmp /= 24;
913         days = tmp;
914
915         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
916                days, hours, minutes, seconds,
917                ctime(&uptime->last_recovery_finished.tv_sec));
918
919         printf("Duration of last recovery/failover: %lf seconds\n",
920                timeval_delta(&uptime->last_recovery_finished,
921                              &uptime->last_recovery_started));
922
923         return 0;
924 }
925
926 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
927                         int argc, const char **argv)
928 {
929         struct timeval tv;
930         int ret, num_clients;
931
932         tv = timeval_current();
933         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
934                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
935         if (ret != 0) {
936                 return ret;
937         }
938
939         printf("response from %u time=%.6f sec  (%d clients)\n",
940                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
941         return 0;
942 }
943
944 const char *runstate_to_string(enum ctdb_runstate runstate);
945 enum ctdb_runstate runstate_from_string(const char *runstate_str);
946
947 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
948                             int argc, const char **argv)
949 {
950         enum ctdb_runstate runstate;
951         bool found;
952         int ret, i;
953
954         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
955                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
956         if (ret != 0) {
957                 return ret;
958         }
959
960         found = true;
961         for (i=0; i<argc; i++) {
962                 enum ctdb_runstate t;
963
964                 found = false;
965                 t = ctdb_runstate_from_string(argv[i]);
966                 if (t == CTDB_RUNSTATE_UNKNOWN) {
967                         printf("Invalid run state (%s)\n", argv[i]);
968                         return 1;
969                 }
970
971                 if (t == runstate) {
972                         found = true;
973                         break;
974                 }
975         }
976
977         if (! found) {
978                 printf("CTDB not in required run state (got %s)\n",
979                        ctdb_runstate_to_string(runstate));
980                 return 1;
981         }
982
983         printf("%s\n", ctdb_runstate_to_string(runstate));
984         return 0;
985 }
986
987 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
988                           int argc, const char **argv)
989 {
990         struct ctdb_var_list *tun_var_list;
991         uint32_t value;
992         int ret, i;
993         bool found;
994
995         if (argc != 1) {
996                 usage("getvar");
997         }
998
999         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1000                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1001         if (ret != 0) {
1002                 fprintf(stderr,
1003                         "Failed to get list of variables from node %u\n",
1004                         ctdb->cmd_pnn);
1005                 return ret;
1006         }
1007
1008         found = false;
1009         for (i=0; i<tun_var_list->count; i++) {
1010                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1011                         found = true;
1012                         break;
1013                 }
1014         }
1015
1016         if (! found) {
1017                 printf("No such tunable %s\n", argv[0]);
1018                 return 1;
1019         }
1020
1021         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1022                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1023         if (ret != 0) {
1024                 return ret;
1025         }
1026
1027         printf("%-26s = %u\n", argv[0], value);
1028         return 0;
1029 }
1030
1031 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1032                           int argc, const char **argv)
1033 {
1034         struct ctdb_var_list *tun_var_list;
1035         struct ctdb_tunable tunable;
1036         int ret, i;
1037         bool found;
1038
1039         if (argc != 2) {
1040                 usage("setvar");
1041         }
1042
1043         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1044                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1045         if (ret != 0) {
1046                 fprintf(stderr,
1047                         "Failed to get list of variables from node %u\n",
1048                         ctdb->cmd_pnn);
1049                 return ret;
1050         }
1051
1052         found = false;
1053         for (i=0; i<tun_var_list->count; i++) {
1054                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1055                         found = true;
1056                         break;
1057                 }
1058         }
1059
1060         if (! found) {
1061                 printf("No such tunable %s\n", argv[0]);
1062                 return 1;
1063         }
1064
1065         tunable.name = argv[0];
1066         tunable.value = strtoul(argv[1], NULL, 0);
1067
1068         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1069                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1070         if (ret != 0) {
1071                 if (ret == 1) {
1072                         fprintf(stderr,
1073                                 "Setting obsolete tunable variable '%s'\n",
1074                                tunable.name);
1075                         return 0;
1076                 }
1077         }
1078
1079         return ret;
1080 }
1081
1082 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1083                             int argc, const char **argv)
1084 {
1085         struct ctdb_var_list *tun_var_list;
1086         int ret, i;
1087
1088         if (argc != 0) {
1089                 usage("listvars");
1090         }
1091
1092         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1093                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1094         if (ret != 0) {
1095                 return ret;
1096         }
1097
1098         for (i=0; i<tun_var_list->count; i++) {
1099                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1100         }
1101
1102         return 0;
1103 }
1104
1105 const struct {
1106         const char *name;
1107         uint32_t offset;
1108 } stats_fields[] = {
1109 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1110         STATISTICS_FIELD(num_clients),
1111         STATISTICS_FIELD(frozen),
1112         STATISTICS_FIELD(recovering),
1113         STATISTICS_FIELD(num_recoveries),
1114         STATISTICS_FIELD(client_packets_sent),
1115         STATISTICS_FIELD(client_packets_recv),
1116         STATISTICS_FIELD(node_packets_sent),
1117         STATISTICS_FIELD(node_packets_recv),
1118         STATISTICS_FIELD(keepalive_packets_sent),
1119         STATISTICS_FIELD(keepalive_packets_recv),
1120         STATISTICS_FIELD(node.req_call),
1121         STATISTICS_FIELD(node.reply_call),
1122         STATISTICS_FIELD(node.req_dmaster),
1123         STATISTICS_FIELD(node.reply_dmaster),
1124         STATISTICS_FIELD(node.reply_error),
1125         STATISTICS_FIELD(node.req_message),
1126         STATISTICS_FIELD(node.req_control),
1127         STATISTICS_FIELD(node.reply_control),
1128         STATISTICS_FIELD(client.req_call),
1129         STATISTICS_FIELD(client.req_message),
1130         STATISTICS_FIELD(client.req_control),
1131         STATISTICS_FIELD(timeouts.call),
1132         STATISTICS_FIELD(timeouts.control),
1133         STATISTICS_FIELD(timeouts.traverse),
1134         STATISTICS_FIELD(locks.num_calls),
1135         STATISTICS_FIELD(locks.num_current),
1136         STATISTICS_FIELD(locks.num_pending),
1137         STATISTICS_FIELD(locks.num_failed),
1138         STATISTICS_FIELD(total_calls),
1139         STATISTICS_FIELD(pending_calls),
1140         STATISTICS_FIELD(childwrite_calls),
1141         STATISTICS_FIELD(pending_childwrite_calls),
1142         STATISTICS_FIELD(memory_used),
1143         STATISTICS_FIELD(max_hop_count),
1144         STATISTICS_FIELD(total_ro_delegations),
1145         STATISTICS_FIELD(total_ro_revokes),
1146 };
1147
1148 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1149
1150 static void print_statistics_machine(struct ctdb_statistics *s,
1151                                      bool show_header)
1152 {
1153         int i;
1154
1155         if (show_header) {
1156                 printf("CTDB version%s", options.sep);
1157                 printf("Current time of statistics%s", options.sep);
1158                 printf("Statistics collected since%s", options.sep);
1159                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1160                         printf("%s%s", stats_fields[i].name, options.sep);
1161                 }
1162                 printf("num_reclock_ctdbd_latency%s", options.sep);
1163                 printf("min_reclock_ctdbd_latency%s", options.sep);
1164                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1165                 printf("max_reclock_ctdbd_latency%s", options.sep);
1166
1167                 printf("num_reclock_recd_latency%s", options.sep);
1168                 printf("min_reclock_recd_latency%s", options.sep);
1169                 printf("avg_reclock_recd_latency%s", options.sep);
1170                 printf("max_reclock_recd_latency%s", options.sep);
1171
1172                 printf("num_call_latency%s", options.sep);
1173                 printf("min_call_latency%s", options.sep);
1174                 printf("avg_call_latency%s", options.sep);
1175                 printf("max_call_latency%s", options.sep);
1176
1177                 printf("num_lockwait_latency%s", options.sep);
1178                 printf("min_lockwait_latency%s", options.sep);
1179                 printf("avg_lockwait_latency%s", options.sep);
1180                 printf("max_lockwait_latency%s", options.sep);
1181
1182                 printf("num_childwrite_latency%s", options.sep);
1183                 printf("min_childwrite_latency%s", options.sep);
1184                 printf("avg_childwrite_latency%s", options.sep);
1185                 printf("max_childwrite_latency%s", options.sep);
1186                 printf("\n");
1187         }
1188
1189         printf("%u%s", CTDB_PROTOCOL, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1191         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1192         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1193                 printf("%u%s",
1194                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1195                        options.sep);
1196         }
1197         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1198         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1199         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1200         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1201
1202         printf("%u%s", s->reclock.recd.num, options.sep);
1203         printf("%.6f%s", s->reclock.recd.min, options.sep);
1204         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1205         printf("%.6f%s", s->reclock.recd.max, options.sep);
1206
1207         printf("%d%s", s->call_latency.num, options.sep);
1208         printf("%.6f%s", s->call_latency.min, options.sep);
1209         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1210         printf("%.6f%s", s->call_latency.max, options.sep);
1211
1212         printf("%d%s", s->childwrite_latency.num, options.sep);
1213         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1214         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1215         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1216         printf("\n");
1217 }
1218
1219 static void print_statistics(struct ctdb_statistics *s)
1220 {
1221         int tmp, days, hours, minutes, seconds;
1222         int i;
1223         const char *prefix = NULL;
1224         int preflen = 0;
1225
1226         tmp = s->statistics_current_time.tv_sec -
1227               s->statistics_start_time.tv_sec;
1228         seconds = tmp % 60; tmp /= 60;
1229         minutes = tmp % 60; tmp /= 60;
1230         hours   = tmp % 24; tmp /= 24;
1231         days    = tmp;
1232
1233         printf("CTDB version %u\n", CTDB_PROTOCOL);
1234         printf("Current time of statistics  :                %s",
1235                ctime(&s->statistics_current_time.tv_sec));
1236         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1237                days, hours, minutes, seconds,
1238                ctime(&s->statistics_start_time.tv_sec));
1239
1240         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1241                 if (strchr(stats_fields[i].name, '.') != NULL) {
1242                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1243                         if (! prefix ||
1244                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1245                                 prefix = stats_fields[i].name;
1246                                 printf(" %*.*s\n", preflen-1, preflen-1,
1247                                        stats_fields[i].name);
1248                         }
1249                 } else {
1250                         preflen = 0;
1251                 }
1252                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1253                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1254                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1255         }
1256
1257         printf(" hop_count_buckets:");
1258         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1259                 printf(" %d", s->hop_count_bucket[i]);
1260         }
1261         printf("\n");
1262         printf(" lock_buckets:");
1263         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1264                 printf(" %d", s->locks.buckets[i]);
1265         }
1266         printf("\n");
1267         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1268                "locks_latency      MIN/AVG/MAX",
1269                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1270                s->locks.latency.max, s->locks.latency.num);
1271
1272         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1273                "reclock_ctdbd      MIN/AVG/MAX",
1274                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1275                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1276
1277         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1278                "reclock_recd       MIN/AVG/MAX",
1279                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1280                s->reclock.recd.max, s->reclock.recd.num);
1281
1282         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1283                "call_latency       MIN/AVG/MAX",
1284                s->call_latency.min, LATENCY_AVG(s->call_latency),
1285                s->call_latency.max, s->call_latency.num);
1286
1287         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1288                "childwrite_latency MIN/AVG/MAX",
1289                s->childwrite_latency.min,
1290                LATENCY_AVG(s->childwrite_latency),
1291                s->childwrite_latency.max, s->childwrite_latency.num);
1292 }
1293
1294 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1295                               int argc, const char **argv)
1296 {
1297         struct ctdb_statistics *stats;
1298         int ret;
1299
1300         if (argc != 0) {
1301                 usage("statistics");
1302         }
1303
1304         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1305                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1306         if (ret != 0) {
1307                 return ret;
1308         }
1309
1310         if (options.machinereadable) {
1311                 print_statistics_machine(stats, true);
1312         } else {
1313                 print_statistics(stats);
1314         }
1315
1316         return 0;
1317 }
1318
1319 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1320                                     struct ctdb_context *ctdb,
1321                                     int argc, const char **argv)
1322 {
1323         int ret;
1324
1325         if (argc != 0) {
1326                 usage("statisticsreset");
1327         }
1328
1329         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1330                                          ctdb->cmd_pnn, TIMEOUT());
1331         if (ret != 0) {
1332                 return ret;
1333         }
1334
1335         return 0;
1336 }
1337
1338 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1339                          int argc, const char **argv)
1340 {
1341         struct ctdb_statistics_list *slist;
1342         int ret, count = 0, i;
1343         bool show_header = true;
1344
1345         if (argc > 1) {
1346                 usage("stats");
1347         }
1348
1349         if (argc == 1) {
1350                 count = atoi(argv[0]);
1351         }
1352
1353         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1354                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1355         if (ret != 0) {
1356                 return ret;
1357         }
1358
1359         for (i=0; i<slist->num; i++) {
1360                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1361                         continue;
1362                 }
1363                 if (options.machinereadable == 1) {
1364                         print_statistics_machine(&slist->stats[i],
1365                                                  show_header);
1366                         show_header = false;
1367                 } else {
1368                         print_statistics(&slist->stats[i]);
1369                 }
1370                 if (count > 0 && i == count) {
1371                         break;
1372                 }
1373         }
1374
1375         return 0;
1376 }
1377
1378 static int ctdb_public_ip_cmp(const void *a, const void *b)
1379 {
1380         const struct ctdb_public_ip *ip_a = a;
1381         const struct ctdb_public_ip *ip_b = b;
1382
1383         return ctdb_sock_addr_cmp(&ip_a->addr, &ip_b->addr);
1384 }
1385
1386 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1387                      struct ctdb_public_ip_list *ips,
1388                      struct ctdb_public_ip_info **ipinfo,
1389                      bool all_nodes)
1390 {
1391         int i, j;
1392         char *conf, *avail, *active;
1393
1394         if (options.machinereadable == 1) {
1395                 printf("%s%s%s%s%s", options.sep,
1396                        "Public IP", options.sep,
1397                        "Node", options.sep);
1398                 if (options.verbose == 1) {
1399                         printf("%s%s%s%s%s%s\n",
1400                                "ActiveInterfaces", options.sep,
1401                                "AvailableInterfaces", options.sep,
1402                                "ConfiguredInterfaces", options.sep);
1403                 } else {
1404                         printf("\n");
1405                 }
1406         } else {
1407                 if (all_nodes) {
1408                         printf("Public IPs on ALL nodes\n");
1409                 } else {
1410                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1411                 }
1412         }
1413
1414         for (i = 0; i < ips->num; i++) {
1415
1416                 if (options.machinereadable == 1) {
1417                         printf("%s%s%s%d%s", options.sep,
1418                                ctdb_sock_addr_to_string(
1419                                         mem_ctx, &ips->ip[i].addr),
1420                                options.sep,
1421                                (int)ips->ip[i].pnn, options.sep);
1422                 } else {
1423                         printf("%s", ctdb_sock_addr_to_string(
1424                                                 mem_ctx, &ips->ip[i].addr));
1425                 }
1426
1427                 if (options.verbose == 0) {
1428                         if (options.machinereadable == 1) {
1429                                 printf("\n");
1430                         } else {
1431                                 printf(" %d\n", (int)ips->ip[i].pnn);
1432                         }
1433                         continue;
1434                 }
1435
1436                 conf = NULL;
1437                 avail = NULL;
1438                 active = NULL;
1439
1440                 if (ipinfo[i] == NULL) {
1441                         goto skip_ipinfo;
1442                 }
1443
1444                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1445                         struct ctdb_iface *iface;
1446
1447                         iface = &ipinfo[i]->ifaces->iface[j];
1448                         if (conf == NULL) {
1449                                 conf = talloc_strdup(mem_ctx, iface->name);
1450                         } else {
1451                                 conf = talloc_asprintf_append(
1452                                                 conf, ",%s", iface->name);
1453                         }
1454
1455                         if (ipinfo[i]->active_idx == j) {
1456                                 active = iface->name;
1457                         }
1458
1459                         if (iface->link_state == 0) {
1460                                 continue;
1461                         }
1462
1463                         if (avail == NULL) {
1464                                 avail = talloc_strdup(mem_ctx, iface->name);
1465                         } else {
1466                                 avail = talloc_asprintf_append(
1467                                                 avail, ",%s", iface->name);
1468                         }
1469                 }
1470
1471         skip_ipinfo:
1472
1473                 if (options.machinereadable == 1) {
1474                         printf("%s%s%s%s%s%s\n",
1475                                active ? active : "", options.sep,
1476                                avail ? avail : "", options.sep,
1477                                conf ? conf : "", options.sep);
1478                 } else {
1479                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1480                                ips->ip[i].pnn, active ? active : "",
1481                                avail ? avail : "", conf ? conf : "");
1482                 }
1483         }
1484 }
1485
1486 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1487                        size_t datalen, void *private_data)
1488 {
1489         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1490                 private_data, struct ctdb_public_ip_list);
1491         struct ctdb_public_ip *ip;
1492
1493         ip = (struct ctdb_public_ip *)databuf;
1494         ips->ip[ips->num] = *ip;
1495         ips->num += 1;
1496
1497         return 0;
1498 }
1499
1500 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1501                               struct ctdb_public_ip_list **out)
1502 {
1503         struct ctdb_node_map *nodemap;
1504         struct ctdb_public_ip_list *ips;
1505         struct db_hash_context *ipdb;
1506         uint32_t *pnn_list;
1507         int ret, count, i, j;
1508
1509         nodemap = get_nodemap(ctdb, false);
1510         if (nodemap == NULL) {
1511                 return 1;
1512         }
1513
1514         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1515         if (ret != 0) {
1516                 goto failed;
1517         }
1518
1519         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1520                                      &pnn_list);
1521         if (count <= 0) {
1522                 goto failed;
1523         }
1524
1525         for (i=0; i<count; i++) {
1526                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1527                                                pnn_list[i], TIMEOUT(), &ips);
1528                 if (ret != 0) {
1529                         goto failed;
1530                 }
1531
1532                 for (j=0; j<ips->num; j++) {
1533                         struct ctdb_public_ip ip;
1534
1535                         ip.pnn = ips->ip[j].pnn;
1536                         ip.addr = ips->ip[j].addr;
1537
1538                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1539                                           sizeof(ip.addr),
1540                                           (uint8_t *)&ip, sizeof(ip));
1541                         if (ret != 0) {
1542                                 goto failed;
1543                         }
1544                 }
1545
1546                 TALLOC_FREE(ips);
1547         }
1548
1549         talloc_free(pnn_list);
1550
1551         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1552         if (ret != 0) {
1553                 goto failed;
1554         }
1555
1556         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1557         if (ips == NULL) {
1558                 goto failed;
1559         }
1560
1561         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1562         if (ips->ip == NULL) {
1563                 goto failed;
1564         }
1565
1566         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1567         if (ret != 0) {
1568                 goto failed;
1569         }
1570
1571         if (count != ips->num) {
1572                 goto failed;
1573         }
1574
1575         talloc_free(ipdb);
1576
1577         *out = ips;
1578         return 0;
1579
1580 failed:
1581         talloc_free(ipdb);
1582         return 1;
1583 }
1584
1585 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1586                       int argc, const char **argv)
1587 {
1588         struct ctdb_public_ip_list *ips;
1589         struct ctdb_public_ip_info **ipinfo;
1590         int ret, i;
1591         bool do_all = false;
1592
1593         if (argc > 1) {
1594                 usage("ip");
1595         }
1596
1597         if (argc == 1) {
1598                 if (strcmp(argv[0], "all") == 0) {
1599                         do_all = true;
1600                 } else {
1601                         usage("ip");
1602                 }
1603         }
1604
1605         if (do_all) {
1606                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1607         } else {
1608                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1609                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1610         }
1611         if (ret != 0) {
1612                 return ret;
1613         }
1614
1615         qsort(ips->ip, ips->num, sizeof(struct ctdb_public_ip),
1616               ctdb_public_ip_cmp);
1617
1618         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1619         if (ipinfo == NULL) {
1620                 return 1;
1621         }
1622
1623         for (i=0; i<ips->num; i++) {
1624                 uint32_t pnn;
1625                 if (do_all) {
1626                         pnn = ips->ip[i].pnn;
1627                 } else {
1628                         pnn = ctdb->cmd_pnn;
1629                 }
1630                 if (pnn == CTDB_UNKNOWN_PNN) {
1631                         ipinfo[i] = NULL;
1632                         continue;
1633                 }
1634                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1635                                                    ctdb->client, pnn,
1636                                                    TIMEOUT(), &ips->ip[i].addr,
1637                                                    &ipinfo[i]);
1638                 if (ret != 0) {
1639                         return ret;
1640                 }
1641         }
1642
1643         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1644         return 0;
1645 }
1646
1647 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1648                           int argc, const char **argv)
1649 {
1650         struct ctdb_public_ip_info *ipinfo;
1651         ctdb_sock_addr addr;
1652         int ret, i;
1653
1654         if (argc != 1) {
1655                 usage("ipinfo");
1656         }
1657
1658         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1659                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1660                 return 1;
1661         }
1662
1663         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1664                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1665                                            &ipinfo);
1666         if (ret != 0) {
1667                 if (ret == -1) {
1668                         printf("Node %u does not know about IP %s\n",
1669                                ctdb->cmd_pnn, argv[0]);
1670                 }
1671                 return ret;
1672         }
1673
1674         printf("Public IP[%s] info on node %u\n",
1675                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1676                                         ctdb->cmd_pnn);
1677
1678         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1679                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1680                ipinfo->ip.pnn, ipinfo->ifaces->num);
1681
1682         for (i=0; i<ipinfo->ifaces->num; i++) {
1683                 struct ctdb_iface *iface;
1684
1685                 iface = &ipinfo->ifaces->iface[i];
1686                 iface->name[CTDB_IFACE_SIZE] = '\0';
1687                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1688                        i+1, iface->name,
1689                        iface->link_state == 0 ? "down" : "up",
1690                        iface->references,
1691                        (i == ipinfo->active_idx) ? " (active)" : "");
1692         }
1693
1694         return 0;
1695 }
1696
1697 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1698                           int argc, const char **argv)
1699 {
1700         struct ctdb_iface_list *ifaces;
1701         int ret, i;
1702
1703         if (argc != 0) {
1704                 usage("ifaces");
1705         }
1706
1707         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1708                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1709         if (ret != 0) {
1710                 return ret;
1711         }
1712
1713         if (ifaces->num == 0) {
1714                 printf("No interfaces configured on node %u\n",
1715                        ctdb->cmd_pnn);
1716                 return 0;
1717         }
1718
1719         if (options.machinereadable) {
1720                 printf("%s%s%s%s%s%s%s\n", options.sep,
1721                        "Name", options.sep,
1722                        "LinkStatus", options.sep,
1723                        "References", options.sep);
1724         } else {
1725                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1726         }
1727
1728         for (i=0; i<ifaces->num; i++) {
1729                 if (options.machinereadable) {
1730                         printf("%s%s%s%u%s%u%s\n", options.sep,
1731                                ifaces->iface[i].name, options.sep,
1732                                ifaces->iface[i].link_state, options.sep,
1733                                ifaces->iface[i].references, options.sep);
1734                 } else {
1735                         printf("name:%s link:%s references:%u\n",
1736                                ifaces->iface[i].name,
1737                                ifaces->iface[i].link_state ? "up" : "down",
1738                                ifaces->iface[i].references);
1739                 }
1740         }
1741
1742         return 0;
1743 }
1744
1745 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1746                                 int argc, const char **argv)
1747 {
1748         struct ctdb_iface_list *ifaces;
1749         struct ctdb_iface *iface;
1750         int ret, i;
1751
1752         if (argc != 2) {
1753                 usage("setifacelink");
1754         }
1755
1756         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1757                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1758                 return 1;
1759         }
1760
1761         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1762                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1763         if (ret != 0) {
1764                 fprintf(stderr,
1765                         "Failed to get interface information from node %u\n",
1766                         ctdb->cmd_pnn);
1767                 return ret;
1768         }
1769
1770         iface = NULL;
1771         for (i=0; i<ifaces->num; i++) {
1772                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1773                         iface = &ifaces->iface[i];
1774                         break;
1775                 }
1776         }
1777
1778         if (iface == NULL) {
1779                 printf("Interface %s not configured on node %u\n",
1780                        argv[0], ctdb->cmd_pnn);
1781                 return 1;
1782         }
1783
1784         if (strcmp(argv[1], "up") == 0) {
1785                 iface->link_state = 1;
1786         } else if (strcmp(argv[1], "down") == 0) {
1787                 iface->link_state = 0;
1788         } else {
1789                 usage("setifacelink");
1790                 return 1;
1791         }
1792
1793         iface->references = 0;
1794
1795         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1796                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1797         if (ret != 0) {
1798                 return ret;
1799         }
1800
1801         return 0;
1802 }
1803
1804 static int control_process_exists(TALLOC_CTX *mem_ctx,
1805                                   struct ctdb_context *ctdb,
1806                                   int argc, const char **argv)
1807 {
1808         pid_t pid;
1809         int ret, status;
1810
1811         if (argc != 1) {
1812                 usage("process-exists");
1813         }
1814
1815         pid = atoi(argv[0]);
1816         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1817                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1818         if (ret != 0) {
1819                 return ret;
1820         }
1821
1822         if (status == 0) {
1823                 printf("PID %u exists\n", pid);
1824         } else {
1825                 printf("PID %u does not exist\n", pid);
1826         }
1827         return status;
1828 }
1829
1830 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1831                             int argc, const char **argv)
1832 {
1833         struct ctdb_dbid_map *dbmap;
1834         int ret, i;
1835
1836         if (argc != 0) {
1837                 usage("getdbmap");
1838         }
1839
1840         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1841                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1842         if (ret != 0) {
1843                 return ret;
1844         }
1845
1846         if (options.machinereadable == 1) {
1847                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1848                        options.sep,
1849                        "ID", options.sep,
1850                        "Name", options.sep,
1851                        "Path", options.sep,
1852                        "Persistent", options.sep,
1853                        "Sticky", options.sep,
1854                        "Unhealthy", options.sep,
1855                        "Readonly", options.sep);
1856         } else {
1857                 printf("Number of databases:%d\n", dbmap->num);
1858         }
1859
1860         for (i=0; i<dbmap->num; i++) {
1861                 const char *name;
1862                 const char *path;
1863                 const char *health;
1864                 bool persistent;
1865                 bool readonly;
1866                 bool sticky;
1867                 uint32_t db_id;
1868
1869                 db_id = dbmap->dbs[i].db_id;
1870
1871                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1872                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1873                                            &name);
1874                 if (ret != 0) {
1875                         return ret;
1876                 }
1877
1878                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1879                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1880                                           &path);
1881                 if (ret != 0) {
1882                         return ret;
1883                 }
1884
1885                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1886                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1887                                               &health);
1888                 if (ret != 0) {
1889                         return ret;
1890                 }
1891
1892                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1893                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1894                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1895
1896                 if (options.machinereadable == 1) {
1897                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1898                                options.sep,
1899                                db_id, options.sep,
1900                                name, options.sep,
1901                                path, options.sep,
1902                                !! (persistent), options.sep,
1903                                !! (sticky), options.sep,
1904                                !! (health), options.sep,
1905                                !! (readonly), options.sep);
1906                 } else {
1907                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1908                                db_id, name, path,
1909                                persistent ? " PERSISTENT" : "",
1910                                sticky ? " STICKY" : "",
1911                                readonly ? " READONLY" : "",
1912                                health ? " UNHEALTHY" : "");
1913                 }
1914
1915                 talloc_free(discard_const(name));
1916                 talloc_free(discard_const(path));
1917                 talloc_free(discard_const(health));
1918         }
1919
1920         return 0;
1921 }
1922
1923 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1924                                int argc, const char **argv)
1925 {
1926         uint32_t db_id;
1927         const char *db_name, *db_path, *db_health;
1928         uint8_t db_flags;
1929         int ret;
1930
1931         if (argc != 1) {
1932                 usage("getdbstatus");
1933         }
1934
1935         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1936                 return 1;
1937         }
1938
1939         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1940                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1941                                   &db_path);
1942         if (ret != 0) {
1943                 return ret;
1944         }
1945
1946         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1947                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1948                                       &db_health);
1949         if (ret != 0) {
1950                 return ret;
1951         }
1952
1953         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1954         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1955                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1956                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1957                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1958                (db_health ? db_health : "OK"));
1959         return 0;
1960 }
1961
1962 struct dump_record_state {
1963         uint32_t count;
1964 };
1965
1966 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1967
1968 static void dump_tdb_data(const char *name, TDB_DATA val)
1969 {
1970         int i;
1971
1972         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1973         for (i=0; i<val.dsize; i++) {
1974                 if (ISASCII(val.dptr[i])) {
1975                         fprintf(stdout, "%c", val.dptr[i]);
1976                 } else {
1977                         fprintf(stdout, "\\%02X", val.dptr[i]);
1978                 }
1979         }
1980         fprintf(stdout, "\"\n");
1981 }
1982
1983 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1984 {
1985         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1986         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1987         fprintf(stdout, "flags: 0x%08x", header->flags);
1988         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1989                 fprintf(stdout, " MIGRATED_WITH_DATA");
1990         }
1991         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1992                 fprintf(stdout, " VACUUM_MIGRATED");
1993         }
1994         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1995                 fprintf(stdout, " AUTOMATIC");
1996         }
1997         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1998                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1999         }
2000         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
2001                 fprintf(stdout, " RO_HAVE_READONLY");
2002         }
2003         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
2004                 fprintf(stdout, " RO_REVOKING_READONLY");
2005         }
2006         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
2007                 fprintf(stdout, " RO_REVOKE_COMPLETE");
2008         }
2009         fprintf(stdout, "\n");
2010
2011 }
2012
2013 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
2014                        TDB_DATA key, TDB_DATA data, void *private_data)
2015 {
2016         struct dump_record_state *state =
2017                 (struct dump_record_state *)private_data;
2018
2019         state->count += 1;
2020
2021         dump_tdb_data("key", key);
2022         dump_ltdb_header(header);
2023         dump_tdb_data("data", data);
2024         fprintf(stdout, "\n");
2025
2026         return 0;
2027 }
2028
2029 struct traverse_state {
2030         TALLOC_CTX *mem_ctx;
2031         bool done;
2032         ctdb_rec_parser_func_t func;
2033         struct dump_record_state sub_state;
2034 };
2035
2036 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2037 {
2038         struct traverse_state *state = (struct traverse_state *)private_data;
2039         struct ctdb_rec_data *rec;
2040         struct ctdb_ltdb_header header;
2041         int ret;
2042
2043         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2044         if (ret != 0) {
2045                 return;
2046         }
2047
2048         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2049                 talloc_free(rec);
2050                 /* end of traverse */
2051                 state->done = true;
2052                 return;
2053         }
2054
2055         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2056         if (ret != 0) {
2057                 talloc_free(rec);
2058                 return;
2059         }
2060
2061         if (rec->data.dsize == 0) {
2062                 talloc_free(rec);
2063                 return;
2064         }
2065
2066         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2067                           &state->sub_state);
2068         talloc_free(rec);
2069         if (ret != 0) {
2070                 state->done = true;
2071         }
2072 }
2073
2074 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2075                          int argc, const char **argv)
2076 {
2077         struct ctdb_db_context *db;
2078         const char *db_name;
2079         uint32_t db_id;
2080         uint8_t db_flags;
2081         struct ctdb_traverse_start_ext traverse;
2082         struct traverse_state state;
2083         int ret;
2084
2085         if (argc != 1) {
2086                 usage("catdb");
2087         }
2088
2089         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2090                 return 1;
2091         }
2092
2093         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2094                           db_flags, &db);
2095         if (ret != 0) {
2096                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2097                 return ret;
2098         }
2099
2100         /* Valgrind fix */
2101         ZERO_STRUCT(traverse);
2102
2103         traverse.db_id = db_id;
2104         traverse.reqid = 0;
2105         traverse.srvid = next_srvid(ctdb);
2106         traverse.withemptyrecords = false;
2107
2108         state.mem_ctx = mem_ctx;
2109         state.done = false;
2110         state.func = dump_record;
2111         state.sub_state.count = 0;
2112
2113         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2114                                               traverse.srvid,
2115                                               traverse_handler, &state);
2116         if (ret != 0) {
2117                 return ret;
2118         }
2119
2120         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2121                                            ctdb->cmd_pnn, TIMEOUT(),
2122                                            &traverse);
2123         if (ret != 0) {
2124                 return ret;
2125         }
2126
2127         ctdb_client_wait(ctdb->ev, &state.done);
2128
2129         printf("Dumped %u records\n", state.sub_state.count);
2130
2131         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2132                                                  traverse.srvid, &state);
2133         if (ret != 0) {
2134                 return ret;
2135         }
2136
2137         return 0;
2138 }
2139
2140 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2141                           int argc, const char **argv)
2142 {
2143         struct ctdb_db_context *db;
2144         const char *db_name;
2145         uint32_t db_id;
2146         uint8_t db_flags;
2147         struct dump_record_state state;
2148         int ret;
2149
2150         if (argc != 1) {
2151                 usage("catdb");
2152         }
2153
2154         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2155                 return 1;
2156         }
2157
2158         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2159                           db_flags, &db);
2160         if (ret != 0) {
2161                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2162                 return ret;
2163         }
2164
2165         state.count = 0;
2166         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2167
2168         printf("Dumped %u record(s)\n", state.count);
2169
2170         return ret;
2171 }
2172
2173 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2174                               int argc, const char **argv)
2175 {
2176         int mode, ret;
2177
2178         if (argc != 0) {
2179                 usage("getmonmode");
2180         }
2181
2182         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2183                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2184         if (ret != 0) {
2185                 return ret;
2186         }
2187
2188         printf("%s\n",
2189                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2190         return 0;
2191 }
2192
2193 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2194                                    struct ctdb_context *ctdb,
2195                                    int argc, const char **argv)
2196 {
2197         uint32_t caps;
2198         int ret;
2199
2200         if (argc != 0) {
2201                 usage("getcapabilities");
2202         }
2203
2204         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2205                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2206         if (ret != 0) {
2207                 return ret;
2208         }
2209
2210         if (options.machinereadable == 1) {
2211                 printf("%s%s%s%s%s\n",
2212                        options.sep,
2213                        "RECMASTER", options.sep,
2214                        "LMASTER", options.sep);
2215                 printf("%s%d%s%d%s\n", options.sep,
2216                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2217                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2218         } else {
2219                 printf("RECMASTER: %s\n",
2220                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2221                 printf("LMASTER: %s\n",
2222                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2223         }
2224
2225         return 0;
2226 }
2227
2228 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2229                        int argc, const char **argv)
2230 {
2231         printf("%u\n", ctdb_client_pnn(ctdb->client));
2232         return 0;
2233 }
2234
2235 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2236                        int argc, const char **argv)
2237 {
2238         char *t, *lvs_helper = NULL;
2239
2240         if (argc != 1) {
2241                 usage("lvs");
2242         }
2243
2244         t = getenv("CTDB_LVS_HELPER");
2245         if (t != NULL) {
2246                 lvs_helper = talloc_strdup(mem_ctx, t);
2247         } else {
2248                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2249                                              CTDB_HELPER_BINDIR);
2250         }
2251
2252         if (lvs_helper == NULL) {
2253                 fprintf(stderr, "Unable to set LVS helper\n");
2254                 return 1;
2255         }
2256
2257         return run_helper("LVS helper", lvs_helper, argv[0]);
2258 }
2259
2260 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2261                                    struct ctdb_context *ctdb,
2262                                    int argc, const char **argv)
2263 {
2264         int ret;
2265
2266         if (argc != 0) {
2267                 usage("disablemonitor");
2268         }
2269
2270         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2271                                         ctdb->cmd_pnn, TIMEOUT());
2272         if (ret != 0) {
2273                 return ret;
2274         }
2275
2276         return 0;
2277 }
2278
2279 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2280                                   struct ctdb_context *ctdb,
2281                                   int argc, const char **argv)
2282 {
2283         int ret;
2284
2285         if (argc != 0) {
2286                 usage("enablemonitor");
2287         }
2288
2289         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2290                                        ctdb->cmd_pnn, TIMEOUT());
2291         if (ret != 0) {
2292                 return ret;
2293         }
2294
2295         return 0;
2296 }
2297
2298 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2299                             int argc, const char **argv)
2300 {
2301         int log_level;
2302         int ret;
2303         bool found;
2304
2305         if (argc != 1) {
2306                 usage("setdebug");
2307         }
2308
2309         found = debug_level_parse(argv[0], &log_level);
2310         if (! found) {
2311                 fprintf(stderr,
2312                         "Invalid debug level '%s'. Valid levels are:\n",
2313                         argv[0]);
2314                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2315                 return 1;
2316         }
2317
2318         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2319                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2320         if (ret != 0) {
2321                 return ret;
2322         }
2323
2324         return 0;
2325 }
2326
2327 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2328                             int argc, const char **argv)
2329 {
2330         int loglevel;
2331         const char *log_str;
2332         int ret;
2333
2334         if (argc != 0) {
2335                 usage("getdebug");
2336         }
2337
2338         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2339                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2340         if (ret != 0) {
2341                 return ret;
2342         }
2343
2344         log_str = debug_level_to_string(loglevel);
2345         printf("%s\n", log_str);
2346
2347         return 0;
2348 }
2349
2350 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2351                           int argc, const char **argv)
2352 {
2353         const char *db_name;
2354         uint8_t db_flags = 0;
2355         int ret;
2356
2357         if (argc < 1 || argc > 2) {
2358                 usage("attach");
2359         }
2360
2361         db_name = argv[0];
2362         if (argc == 2) {
2363                 if (strcmp(argv[1], "persistent") == 0) {
2364                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2365                 } else if (strcmp(argv[1], "readonly") == 0) {
2366                         db_flags = CTDB_DB_FLAGS_READONLY;
2367                 } else if (strcmp(argv[1], "sticky") == 0) {
2368                         db_flags = CTDB_DB_FLAGS_STICKY;
2369                 } else {
2370                         usage("attach");
2371                 }
2372         }
2373
2374         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2375                           db_flags, NULL);
2376         if (ret != 0) {
2377                 return ret;
2378         }
2379
2380         return 0;
2381 }
2382
2383 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2384                           int argc, const char **argv)
2385 {
2386         const char *db_name;
2387         uint32_t db_id;
2388         uint8_t db_flags;
2389         struct ctdb_node_map *nodemap;
2390         int recmode;
2391         int ret, ret2, i;
2392
2393         if (argc < 1) {
2394                 usage("detach");
2395         }
2396
2397         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2398                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2399         if (ret != 0) {
2400                 return ret;
2401         }
2402
2403         if (recmode == CTDB_RECOVERY_ACTIVE) {
2404                 fprintf(stderr, "Database cannot be detached"
2405                                 " when recovery is active\n");
2406                 return 1;
2407         }
2408
2409         nodemap = get_nodemap(ctdb, false);
2410         if (nodemap == NULL) {
2411                 return 1;
2412         }
2413
2414         for (i=0; i<nodemap->num; i++) {
2415                 uint32_t value;
2416
2417                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2418                         continue;
2419                 }
2420                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2421                         continue;
2422                 }
2423                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2424                         fprintf(stderr, "Database cannot be detached on"
2425                                 " inactive (stopped or banned) node %u\n",
2426                                 nodemap->node[i].pnn);
2427                         return 1;
2428                 }
2429
2430                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2431                                             nodemap->node[i].pnn, TIMEOUT(),
2432                                             "AllowClientDBAttach", &value);
2433                 if (ret != 0) {
2434                         fprintf(stderr,
2435                                 "Unable to get tunable AllowClientDBAttach"
2436                                 " from node %u\n", nodemap->node[i].pnn);
2437                         return ret;
2438                 }
2439
2440                 if (value == 1) {
2441                         fprintf(stderr,
2442                                 "Database access is still active on node %u."
2443                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2444                                 nodemap->node[i].pnn);
2445                         return 1;
2446                 }
2447         }
2448
2449         ret2 = 0;
2450         for (i=0; i<argc; i++) {
2451                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2452                                 &db_flags)) {
2453                         continue;
2454                 }
2455
2456                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2457                         fprintf(stderr,
2458                                 "Persistent database %s cannot be detached\n",
2459                                 argv[0]);
2460                         return 1;
2461                 }
2462
2463                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2464                                   TIMEOUT(), db_id);
2465                 if (ret != 0) {
2466                         fprintf(stderr, "Database %s detach failed\n", db_name);
2467                         ret2 = ret;
2468                 }
2469         }
2470
2471         return ret2;
2472 }
2473
2474 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2475                               int argc, const char **argv)
2476 {
2477         const char *mem_str;
2478         ssize_t n;
2479         int ret;
2480
2481         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2482                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2483         if (ret != 0) {
2484                 return ret;
2485         }
2486
2487         n = write(1, mem_str, strlen(mem_str)+1);
2488         if (n < 0 || n != strlen(mem_str)+1) {
2489                 fprintf(stderr, "Failed to write talloc summary\n");
2490                 return 1;
2491         }
2492
2493         return 0;
2494 }
2495
2496 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2497 {
2498         bool *done = (bool *)private_data;
2499         ssize_t n;
2500
2501         n = write(1, data.dptr, data.dsize);
2502         if (n < 0 || n != data.dsize) {
2503                 fprintf(stderr, "Failed to write talloc summary\n");
2504         }
2505
2506         *done = true;
2507 }
2508
2509 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2510                                 int argc, const char **argv)
2511 {
2512         struct ctdb_srvid_message msg = { 0 };
2513         int ret;
2514         bool done = false;
2515
2516         msg.pnn = ctdb->pnn;
2517         msg.srvid = next_srvid(ctdb);
2518
2519         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2520                                               msg.srvid, dump_memory, &done);
2521         if (ret != 0) {
2522                 return ret;
2523         }
2524
2525         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2526                                     ctdb->cmd_pnn, &msg);
2527         if (ret != 0) {
2528                 return ret;
2529         }
2530
2531         ctdb_client_wait(ctdb->ev, &done);
2532         return 0;
2533 }
2534
2535 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2536                           int argc, const char **argv)
2537 {
2538         pid_t pid;
2539         int ret;
2540
2541         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2542                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2543         if (ret != 0) {
2544                 return ret;
2545         }
2546
2547         printf("%u\n", pid);
2548         return 0;
2549 }
2550
2551 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2552                        const char *desc, uint32_t flag, bool set_flag)
2553 {
2554         struct ctdb_node_map *nodemap;
2555         bool flag_is_set;
2556
2557         nodemap = get_nodemap(ctdb, false);
2558         if (nodemap == NULL) {
2559                 return 1;
2560         }
2561
2562         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2563         if (set_flag == flag_is_set) {
2564                 if (set_flag) {
2565                         fprintf(stderr, "Node %u is already %s\n",
2566                                 ctdb->cmd_pnn, desc);
2567                 } else {
2568                         fprintf(stderr, "Node %u is not %s\n",
2569                                 ctdb->cmd_pnn, desc);
2570                 }
2571                 return 0;
2572         }
2573
2574         return 1;
2575 }
2576
2577 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2578                            uint32_t flag, bool set_flag)
2579 {
2580         struct ctdb_node_map *nodemap;
2581         bool flag_is_set;
2582
2583         while (1) {
2584                 nodemap = get_nodemap(ctdb, true);
2585                 if (nodemap == NULL) {
2586                         fprintf(stderr,
2587                                 "Failed to get nodemap, trying again\n");
2588                         sleep(1);
2589                         continue;
2590                 }
2591
2592                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2593                 if (flag_is_set == set_flag) {
2594                         break;
2595                 }
2596
2597                 sleep(1);
2598         }
2599 }
2600
2601 struct ipreallocate_state {
2602         int status;
2603         bool done;
2604 };
2605
2606 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2607                                  void *private_data)
2608 {
2609         struct ipreallocate_state *state =
2610                 (struct ipreallocate_state *)private_data;
2611
2612         if (data.dsize != sizeof(int)) {
2613                 /* Ignore packet */
2614                 return;
2615         }
2616
2617         state->status = *(int *)data.dptr;
2618         state->done = true;
2619 }
2620
2621 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2622 {
2623         struct ctdb_srvid_message msg = { 0 };
2624         struct ipreallocate_state state;
2625         int ret;
2626
2627         msg.pnn = ctdb->pnn;
2628         msg.srvid = next_srvid(ctdb);
2629
2630         state.done = false;
2631         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2632                                               msg.srvid,
2633                                               ipreallocate_handler, &state);
2634         if (ret != 0) {
2635                 return ret;
2636         }
2637
2638         while (true) {
2639                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2640                                                 ctdb->client,
2641                                                 CTDB_BROADCAST_CONNECTED,
2642                                                 &msg);
2643                 if (ret != 0) {
2644                         goto fail;
2645                 }
2646
2647                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2648                                                TIMEOUT());
2649                 if (ret != 0) {
2650                         continue;
2651                 }
2652
2653                 if (state.status >= 0) {
2654                         ret = 0;
2655                 } else {
2656                         ret = state.status;
2657                 }
2658                 break;
2659         }
2660
2661 fail:
2662         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2663                                            msg.srvid, &state);
2664         return ret;
2665 }
2666
2667 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2668                            int argc, const char **argv)
2669 {
2670         int ret;
2671
2672         if (argc != 0) {
2673                 usage("disable");
2674         }
2675
2676         ret = check_flags(mem_ctx, ctdb, "disabled",
2677                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2678         if (ret == 0) {
2679                 return 0;
2680         }
2681
2682         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2683                                  ctdb->cmd_pnn, TIMEOUT(),
2684                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2685         if (ret != 0) {
2686                 fprintf(stderr,
2687                         "Failed to set DISABLED flag on node %u\n",
2688                         ctdb->cmd_pnn);
2689                 return ret;
2690         }
2691
2692         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2693         return ipreallocate(mem_ctx, ctdb);
2694 }
2695
2696 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2697                           int argc, const char **argv)
2698 {
2699         int ret;
2700
2701         if (argc != 0) {
2702                 usage("enable");
2703         }
2704
2705         ret = check_flags(mem_ctx, ctdb, "disabled",
2706                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2707         if (ret == 0) {
2708                 return 0;
2709         }
2710
2711         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2712                                  ctdb->cmd_pnn, TIMEOUT(),
2713                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2714         if (ret != 0) {
2715                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2716                         ctdb->cmd_pnn);
2717                 return ret;
2718         }
2719
2720         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2721         return ipreallocate(mem_ctx, ctdb);
2722 }
2723
2724 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2725                         int argc, const char **argv)
2726 {
2727         int ret;
2728
2729         if (argc != 0) {
2730                 usage("stop");
2731         }
2732
2733         ret = check_flags(mem_ctx, ctdb, "stopped",
2734                           NODE_FLAGS_STOPPED, true);
2735         if (ret == 0) {
2736                 return 0;
2737         }
2738
2739         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2740                                   ctdb->cmd_pnn, TIMEOUT());
2741         if (ret != 0) {
2742                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2743                 return ret;
2744         }
2745
2746         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2747         return ipreallocate(mem_ctx, ctdb);
2748 }
2749
2750 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2751                             int argc, const char **argv)
2752 {
2753         int ret;
2754
2755         if (argc != 0) {
2756                 usage("continue");
2757         }
2758
2759         ret = check_flags(mem_ctx, ctdb, "stopped",
2760                           NODE_FLAGS_STOPPED, false);
2761         if (ret == 0) {
2762                 return 0;
2763         }
2764
2765         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2766                                       ctdb->cmd_pnn, TIMEOUT());
2767         if (ret != 0) {
2768                 fprintf(stderr, "Failed to continue stopped node %u\n",
2769                         ctdb->cmd_pnn);
2770                 return ret;
2771         }
2772
2773         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2774         return ipreallocate(mem_ctx, ctdb);
2775 }
2776
2777 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2778                        int argc, const char **argv)
2779 {
2780         struct ctdb_ban_state ban_state;
2781         int ret;
2782
2783         if (argc != 1) {
2784                 usage("ban");
2785         }
2786
2787         ret = check_flags(mem_ctx, ctdb, "banned",
2788                           NODE_FLAGS_BANNED, true);
2789         if (ret == 0) {
2790                 return 0;
2791         }
2792
2793         ban_state.pnn = ctdb->cmd_pnn;
2794         ban_state.time = strtoul(argv[0], NULL, 0);
2795
2796         if (ban_state.time == 0) {
2797                 fprintf(stderr, "Ban time cannot be zero\n");
2798                 return EINVAL;
2799         }
2800
2801         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2802                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2803         if (ret != 0) {
2804                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2805                 return ret;
2806         }
2807
2808         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2809         return ipreallocate(mem_ctx, ctdb);
2810
2811 }
2812
2813 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2814                          int argc, const char **argv)
2815 {
2816         struct ctdb_ban_state ban_state;
2817         int ret;
2818
2819         if (argc != 0) {
2820                 usage("unban");
2821         }
2822
2823         ret = check_flags(mem_ctx, ctdb, "banned",
2824                           NODE_FLAGS_BANNED, false);
2825         if (ret == 0) {
2826                 return 0;
2827         }
2828
2829         ban_state.pnn = ctdb->cmd_pnn;
2830         ban_state.time = 0;
2831
2832         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2833                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2834         if (ret != 0) {
2835                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2836                 return ret;
2837         }
2838
2839         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2840         return ipreallocate(mem_ctx, ctdb);
2841
2842 }
2843
2844 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2845                             int argc, const char **argv)
2846 {
2847         int ret;
2848
2849         if (argc != 0) {
2850                 usage("shutdown");
2851         }
2852
2853         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2854                                  ctdb->cmd_pnn, TIMEOUT());
2855         if (ret != 0) {
2856                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2857                 return ret;
2858         }
2859
2860         return 0;
2861 }
2862
2863 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2864                           uint32_t *generation)
2865 {
2866         uint32_t recmaster;
2867         int recmode;
2868         struct ctdb_vnn_map *vnnmap;
2869         int ret;
2870
2871 again:
2872         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2873                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2874         if (ret != 0) {
2875                 fprintf(stderr, "Failed to find recovery master\n");
2876                 return ret;
2877         }
2878
2879         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2880                                     recmaster, TIMEOUT(), &recmode);
2881         if (ret != 0) {
2882                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2883                         recmaster);
2884                 return ret;
2885         }
2886
2887         if (recmode == CTDB_RECOVERY_ACTIVE) {
2888                 sleep(1);
2889                 goto again;
2890         }
2891
2892         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2893                                   recmaster, TIMEOUT(), &vnnmap);
2894         if (ret != 0) {
2895                 fprintf(stderr, "Failed to get generation from node %u\n",
2896                         recmaster);
2897                 return ret;
2898         }
2899
2900         if (vnnmap->generation == INVALID_GENERATION) {
2901                 talloc_free(vnnmap);
2902                 sleep(1);
2903                 goto again;
2904         }
2905
2906         *generation = vnnmap->generation;
2907         talloc_free(vnnmap);
2908         return 0;
2909 }
2910
2911
2912 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2913                            int argc, const char **argv)
2914 {
2915         uint32_t generation, next_generation;
2916         int ret;
2917
2918         if (argc != 0) {
2919                 usage("recover");
2920         }
2921
2922         ret = get_generation(mem_ctx, ctdb, &generation);
2923         if (ret != 0) {
2924                 return ret;
2925         }
2926
2927         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2928                                     ctdb->cmd_pnn, TIMEOUT(),
2929                                     CTDB_RECOVERY_ACTIVE);
2930         if (ret != 0) {
2931                 fprintf(stderr, "Failed to set recovery mode active\n");
2932                 return ret;
2933         }
2934
2935         while (1) {
2936                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2937                 if (ret != 0) {
2938                         fprintf(stderr,
2939                                 "Failed to confirm end of recovery\n");
2940                         return ret;
2941                 }
2942
2943                 if (next_generation != generation) {
2944                         break;
2945                 }
2946
2947                 sleep (1);
2948         }
2949
2950         return 0;
2951 }
2952
2953 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2954                                 int argc, const char **argv)
2955 {
2956         if (argc != 0) {
2957                 usage("ipreallocate");
2958         }
2959
2960         return ipreallocate(mem_ctx, ctdb);
2961 }
2962
2963 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2964                                   struct ctdb_context *ctdb,
2965                                   int argc, const char **argv)
2966 {
2967         uint32_t recmaster;
2968         int ret;
2969
2970         if (argc != 0) {
2971                 usage("isnotrecmaster");
2972         }
2973
2974         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2975                                       ctdb->pnn, TIMEOUT(), &recmaster);
2976         if (ret != 0) {
2977                 fprintf(stderr, "Failed to get recmaster\n");
2978                 return ret;
2979         }
2980
2981         if (recmaster != ctdb->pnn) {
2982                 printf("this node is not the recmaster\n");
2983                 return 1;
2984         }
2985
2986         printf("this node is the recmaster\n");
2987         return 0;
2988 }
2989
2990 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2991                            int argc, const char **argv)
2992 {
2993         struct ctdb_addr_info addr_info;
2994         int ret;
2995
2996         if (argc != 2) {
2997                 usage("gratarp");
2998         }
2999
3000         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
3001                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3002                 return 1;
3003         }
3004         addr_info.iface = argv[1];
3005
3006         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
3007                                             ctdb->cmd_pnn, TIMEOUT(),
3008                                             &addr_info);
3009         if (ret != 0) {
3010                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
3011                         ctdb->cmd_pnn);
3012                 return ret;
3013         }
3014
3015         return 0;
3016 }
3017
3018 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3019                            int argc, const char **argv)
3020 {
3021         ctdb_sock_addr src, dst;
3022         int ret;
3023
3024         if (argc != 0 && argc != 2) {
3025                 usage("tickle");
3026         }
3027
3028         if (argc == 0) {
3029                 struct ctdb_connection *clist;
3030                 int count;
3031                 int i, num_failed;
3032
3033                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3034                 if (ret != 0) {
3035                         return ret;
3036                 }
3037
3038                 num_failed = 0;
3039                 for (i=0; i<count; i++) {
3040                         ret = ctdb_sys_send_tcp(&clist[i].src,
3041                                                 &clist[i].dst,
3042                                                 0, 0, 0);
3043                         if (ret != 0) {
3044                                 num_failed += 1;
3045                         }
3046                 }
3047
3048                 TALLOC_FREE(clist);
3049
3050                 if (num_failed > 0) {
3051                         fprintf(stderr, "Failed to send %d tickles\n",
3052                                 num_failed);
3053                         return 1;
3054                 }
3055
3056                 return 0;
3057         }
3058
3059
3060         if (! parse_ip_port(argv[0], &src)) {
3061                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3062                 return 1;
3063         }
3064
3065         if (! parse_ip_port(argv[1], &dst)) {
3066                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3067                 return 1;
3068         }
3069
3070         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3071         if (ret != 0) {
3072                 fprintf(stderr, "Failed to send tickle ack\n");
3073                 return ret;
3074         }
3075
3076         return 0;
3077 }
3078
3079 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3080                               int argc, const char **argv)
3081 {
3082         ctdb_sock_addr addr;
3083         struct ctdb_tickle_list *tickles;
3084         unsigned port = 0;
3085         int ret, i;
3086
3087         if (argc < 1 || argc > 2) {
3088                 usage("gettickles");
3089         }
3090
3091         if (argc == 2) {
3092                 port = strtoul(argv[1], NULL, 10);
3093         }
3094
3095         if (! parse_ip(argv[0], NULL, port, &addr)) {
3096                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3097                 return 1;
3098         }
3099
3100         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3101                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3102                                             &tickles);
3103         if (ret != 0) {
3104                 fprintf(stderr, "Failed to get list of connections\n");
3105                 return ret;
3106         }
3107
3108         if (options.machinereadable) {
3109                 printf("%s%s%s%s%s%s%s%s%s\n",
3110                        options.sep,
3111                        "Source IP", options.sep,
3112                        "Port", options.sep,
3113                        "Destiation IP", options.sep,
3114                        "Port", options.sep);
3115                 for (i=0; i<tickles->num; i++) {
3116                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3117                                ctdb_sock_addr_to_string(
3118                                        mem_ctx, &tickles->conn[i].src),
3119                                options.sep,
3120                                ntohs(tickles->conn[i].src.ip.sin_port),
3121                                options.sep,
3122                                ctdb_sock_addr_to_string(
3123                                        mem_ctx, &tickles->conn[i].dst),
3124                                options.sep,
3125                                ntohs(tickles->conn[i].dst.ip.sin_port),
3126                                options.sep);
3127                 }
3128         } else {
3129                 printf("Connections for IP: %s\n",
3130                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3131                 printf("Num connections: %u\n", tickles->num);
3132                 for (i=0; i<tickles->num; i++) {
3133                         printf("SRC: %s:%u   DST: %s:%u\n",
3134                                ctdb_sock_addr_to_string(
3135                                        mem_ctx, &tickles->conn[i].src),
3136                                ntohs(tickles->conn[i].src.ip.sin_port),
3137                                ctdb_sock_addr_to_string(
3138                                        mem_ctx, &tickles->conn[i].dst),
3139                                ntohs(tickles->conn[i].dst.ip.sin_port));
3140                 }
3141         }
3142
3143         talloc_free(tickles);
3144         return 0;
3145 }
3146
3147 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3148                                    struct ctdb_connection *conn);
3149
3150 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3151
3152 struct process_clist_state {
3153         struct ctdb_connection *clist;
3154         int count;
3155         int num_failed, num_total;
3156         clist_reply_func reply_func;
3157 };
3158
3159 static void process_clist_done(struct tevent_req *subreq);
3160
3161 static struct tevent_req *process_clist_send(
3162                                         TALLOC_CTX *mem_ctx,
3163                                         struct ctdb_context *ctdb,
3164                                         struct ctdb_connection *clist,
3165                                         int count,
3166                                         clist_request_func request_func,
3167                                         clist_reply_func reply_func)
3168 {
3169         struct tevent_req *req, *subreq;
3170         struct process_clist_state *state;
3171         struct ctdb_req_control request;
3172         int i;
3173
3174         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3175         if (req == NULL) {
3176                 return NULL;
3177         }
3178
3179         state->clist = clist;
3180         state->count = count;
3181         state->reply_func = reply_func;
3182
3183         for (i=0; i<count; i++) {
3184                 request_func(&request, &clist[i]);
3185                 subreq = ctdb_client_control_send(state, ctdb->ev,
3186                                                   ctdb->client, ctdb->cmd_pnn,
3187                                                   TIMEOUT(), &request);
3188                 if (tevent_req_nomem(subreq, req)) {
3189                         return tevent_req_post(req, ctdb->ev);
3190                 }
3191                 tevent_req_set_callback(subreq, process_clist_done, req);
3192         }
3193
3194         return req;
3195 }
3196
3197 static void process_clist_done(struct tevent_req *subreq)
3198 {
3199         struct tevent_req *req = tevent_req_callback_data(
3200                 subreq, struct tevent_req);
3201         struct process_clist_state *state = tevent_req_data(
3202                 req, struct process_clist_state);
3203         struct ctdb_reply_control *reply;
3204         int ret;
3205         bool status;
3206
3207         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3208         TALLOC_FREE(subreq);
3209         if (! status) {
3210                 state->num_failed += 1;
3211                 goto done;
3212         }
3213
3214         ret = state->reply_func(reply);
3215         if (ret != 0) {
3216                 state->num_failed += 1;
3217                 goto done;
3218         }
3219
3220 done:
3221         state->num_total += 1;
3222         if (state->num_total == state->count) {
3223                 tevent_req_done(req);
3224         }
3225 }
3226
3227 static int process_clist_recv(struct tevent_req *req)
3228 {
3229         struct process_clist_state *state = tevent_req_data(
3230                 req, struct process_clist_state);
3231
3232         return state->num_failed;
3233 }
3234
3235 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3236                              int argc, const char **argv)
3237 {
3238         struct ctdb_connection conn;
3239         int ret;
3240
3241         if (argc != 0 && argc != 2) {
3242                 usage("addtickle");
3243         }
3244
3245         if (argc == 0) {
3246                 struct ctdb_connection *clist;
3247                 struct tevent_req *req;
3248                 int count;
3249
3250                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3251                 if (ret != 0) {
3252                         return ret;
3253                 }
3254                 if (count == 0) {
3255                         return 0;
3256                 }
3257
3258                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3259                                  ctdb_req_control_tcp_add_delayed_update,
3260                                  ctdb_reply_control_tcp_add_delayed_update);
3261                 if (req == NULL) {
3262                         talloc_free(clist);
3263                         return ENOMEM;
3264                 }
3265
3266                 tevent_req_poll(req, ctdb->ev);
3267                 talloc_free(clist);
3268
3269                 ret = process_clist_recv(req);
3270                 if (ret != 0) {
3271                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3272                         return 1;
3273                 }
3274
3275                 return 0;
3276         }
3277
3278         if (! parse_ip_port(argv[0], &conn.src)) {
3279                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3280                 return 1;
3281         }
3282         if (! parse_ip_port(argv[1], &conn.dst)) {
3283                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3284                 return 1;
3285         }
3286
3287         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3288                                                ctdb->client, ctdb->cmd_pnn,
3289                                                TIMEOUT(), &conn);
3290         if (ret != 0) {
3291                 fprintf(stderr, "Failed to register connection\n");
3292                 return ret;
3293         }
3294
3295         return 0;
3296 }
3297
3298 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3299                              int argc, const char **argv)
3300 {
3301         struct ctdb_connection conn;
3302         int ret;
3303
3304         if (argc != 0 && argc != 2) {
3305                 usage("deltickle");
3306         }
3307
3308         if (argc == 0) {
3309                 struct ctdb_connection *clist;
3310                 struct tevent_req *req;
3311                 int count;
3312
3313                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3314                 if (ret != 0) {
3315                         return ret;
3316                 }
3317                 if (count == 0) {
3318                         return 0;
3319                 }
3320
3321                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3322                                          ctdb_req_control_tcp_remove,
3323                                          ctdb_reply_control_tcp_remove);
3324                 if (req == NULL) {
3325                         talloc_free(clist);
3326                         return ENOMEM;
3327                 }
3328
3329                 tevent_req_poll(req, ctdb->ev);
3330                 talloc_free(clist);
3331
3332                 ret = process_clist_recv(req);
3333                 if (ret != 0) {
3334                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3335                         return 1;
3336                 }
3337
3338                 return 0;
3339         }
3340
3341         if (! parse_ip_port(argv[0], &conn.src)) {
3342                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3343                 return 1;
3344         }
3345         if (! parse_ip_port(argv[1], &conn.dst)) {
3346                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3347                 return 1;
3348         }
3349
3350         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3351                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3352         if (ret != 0) {
3353                 fprintf(stderr, "Failed to unregister connection\n");
3354                 return ret;
3355         }
3356
3357         return 0;
3358 }
3359
3360 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3361                                 int argc, const char **argv)
3362 {
3363         uint64_t *srvid;
3364         uint8_t *result;
3365         int ret, i;
3366
3367         if (argc == 0) {
3368                 usage("check_srvids");
3369         }
3370
3371         srvid = talloc_array(mem_ctx, uint64_t, argc);
3372         if (srvid == NULL) {
3373                 fprintf(stderr, "Memory allocation error\n");
3374                 return 1;
3375         }
3376
3377         for (i=0; i<argc; i++) {
3378                 srvid[i] = strtoull(argv[i], NULL, 0);
3379         }
3380
3381         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3382                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3383                                      &result);
3384         if (ret != 0) {
3385                 fprintf(stderr, "Failed to check srvids on node %u\n",
3386                         ctdb->cmd_pnn);
3387                 return ret;
3388         }
3389
3390         for (i=0; i<argc; i++) {
3391                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3392                        (result[i] ? "exists" : "does not exist"));
3393         }
3394
3395         return 0;
3396 }
3397
3398 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3399                              int argc, const char **argv)
3400 {
3401         struct ctdb_node_map *nodemap;
3402         int i;
3403
3404         if (argc != 0) {
3405                 usage("listnodes");
3406         }
3407
3408         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3409         if (nodemap == NULL) {
3410                 return 1;
3411         }
3412
3413         for (i=0; i<nodemap->num; i++) {
3414                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3415                         continue;
3416                 }
3417
3418                 if (options.machinereadable) {
3419                         printf("%s%u%s%s%s\n", options.sep,
3420                                nodemap->node[i].pnn, options.sep,
3421                                ctdb_sock_addr_to_string(
3422                                         mem_ctx, &nodemap->node[i].addr),
3423                                options.sep);
3424                 } else {
3425                         printf("%s\n",
3426                                ctdb_sock_addr_to_string(
3427                                         mem_ctx, &nodemap->node[i].addr));
3428                 }
3429         }
3430
3431         return 0;
3432 }
3433
3434 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3435                               struct ctdb_node_map *nodemap2)
3436 {
3437         int i;
3438
3439         if (nodemap1->num != nodemap2->num) {
3440                 return false;
3441         }
3442
3443         for (i=0; i<nodemap1->num; i++) {
3444                 struct ctdb_node_and_flags *n1, *n2;
3445
3446                 n1 = &nodemap1->node[i];
3447                 n2 = &nodemap2->node[i];
3448
3449                 if ((n1->pnn != n2->pnn) ||
3450                     (n1->flags != n2->flags) ||
3451                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3452                         return false;
3453                 }
3454         }
3455
3456         return true;
3457 }
3458
3459 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3460                                    struct ctdb_node_map *nm,
3461                                    struct ctdb_node_map *fnm,
3462                                    bool *reload)
3463 {
3464         int i;
3465         bool check_failed = false;
3466
3467         *reload = false;
3468
3469         for (i=0; i<nm->num; i++) {
3470                 if (i >= fnm->num) {
3471                         fprintf(stderr,
3472                                 "Node %u (%s) missing from nodes file\n",
3473                                 nm->node[i].pnn,
3474                                 ctdb_sock_addr_to_string(
3475                                         mem_ctx, &nm->node[i].addr));
3476                         check_failed = true;
3477                         continue;
3478                 }
3479                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3480                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3481                         /* Node remains deleted */
3482                         continue;
3483                 }
3484
3485                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3486                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3487                         /* Node not newly nor previously deleted */
3488                         if (! ctdb_same_ip(&nm->node[i].addr,
3489                                            &fnm->node[i].addr)) {
3490                                 fprintf(stderr,
3491                                         "Node %u has changed IP address"
3492                                         " (was %s, now %s)\n",
3493                                         nm->node[i].pnn,
3494                                         ctdb_sock_addr_to_string(
3495                                                 mem_ctx, &nm->node[i].addr),
3496                                         ctdb_sock_addr_to_string(
3497                                                 mem_ctx, &fnm->node[i].addr));
3498                                 check_failed = true;
3499                         } else {
3500                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3501                                         fprintf(stderr,
3502                                                 "WARNING: Node %u is disconnected."
3503                                                 " You MUST fix this node manually!\n",
3504                                                 nm->node[i].pnn);
3505                                 }
3506                         }
3507                         continue;
3508                 }
3509
3510                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3511                         /* Node is being deleted */
3512                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3513                         *reload = true;
3514                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3515                                 fprintf(stderr,
3516                                         "ERROR: Node %u is still connected\n",
3517                                         nm->node[i].pnn);
3518                                 check_failed = true;
3519                         }
3520                         continue;
3521                 }
3522
3523                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3524                         /* Node was previously deleted */
3525                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3526                         *reload = true;
3527                 }
3528         }
3529
3530         if (check_failed) {
3531                 fprintf(stderr,
3532                         "ERROR: Nodes will not be reloaded due to previous error\n");
3533                 return 1;
3534         }
3535
3536         /* Leftover nodes in file are NEW */
3537         for (; i < fnm->num; i++) {
3538                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3539                 *reload = true;
3540         }
3541
3542         return 0;
3543 }
3544
3545 struct disable_recoveries_state {
3546         uint32_t *pnn_list;
3547         int node_count;
3548         bool *reply;
3549         int status;
3550         bool done;
3551 };
3552
3553 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3554                                        void *private_data)
3555 {
3556         struct disable_recoveries_state *state =
3557                 (struct disable_recoveries_state *)private_data;
3558         int ret, i;
3559
3560         if (data.dsize != sizeof(int)) {
3561                 /* Ignore packet */
3562                 return;
3563         }
3564
3565         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3566         ret = *(int *)data.dptr;
3567         if (ret < 0) {
3568                 state->status = ret;
3569                 state->done = true;
3570                 return;
3571         }
3572         for (i=0; i<state->node_count; i++) {
3573                 if (state->pnn_list[i] == ret) {
3574                         state->reply[i] = true;
3575                         break;
3576                 }
3577         }
3578
3579         state->done = true;
3580         for (i=0; i<state->node_count; i++) {
3581                 if (! state->reply[i]) {
3582                         state->done = false;
3583                         break;
3584                 }
3585         }
3586 }
3587
3588 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3589                               uint32_t timeout, uint32_t *pnn_list, int count)
3590 {
3591         struct ctdb_disable_message disable = { 0 };
3592         struct disable_recoveries_state state;
3593         int ret, i;
3594
3595         disable.pnn = ctdb->pnn;
3596         disable.srvid = next_srvid(ctdb);
3597         disable.timeout = timeout;
3598
3599         state.pnn_list = pnn_list;
3600         state.node_count = count;
3601         state.done = false;
3602         state.status = 0;
3603         state.reply = talloc_zero_array(mem_ctx, bool, count);
3604         if (state.reply == NULL) {
3605                 return ENOMEM;
3606         }
3607
3608         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3609                                               disable.srvid,
3610                                               disable_recoveries_handler,
3611                                               &state);
3612         if (ret != 0) {
3613                 return ret;
3614         }
3615
3616         for (i=0; i<count; i++) {
3617                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3618                                                       ctdb->client,
3619                                                       pnn_list[i],
3620                                                       &disable);
3621                 if (ret != 0) {
3622                         goto fail;
3623                 }
3624         }
3625
3626         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3627         if (ret == ETIME) {
3628                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3629         } else {
3630                 ret = (state.status >= 0 ? 0 : 1);
3631         }
3632
3633 fail:
3634         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3635                                            disable.srvid, &state);
3636         return ret;
3637 }
3638
3639 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3640                                int argc, const char **argv)
3641 {
3642         struct ctdb_node_map *nodemap = NULL;
3643         struct ctdb_node_map *file_nodemap;
3644         struct ctdb_node_map *remote_nodemap;
3645         struct ctdb_req_control request;
3646         struct ctdb_reply_control **reply;
3647         bool reload;
3648         int ret, i;
3649         uint32_t *pnn_list;
3650         int count;
3651
3652         nodemap = get_nodemap(ctdb, false);
3653         if (nodemap == NULL) {
3654                 return 1;
3655         }
3656
3657         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3658         if (file_nodemap == NULL) {
3659                 return 1;
3660         }
3661
3662         for (i=0; i<nodemap->num; i++) {
3663                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3664                         continue;
3665                 }
3666
3667                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3668                                                nodemap->node[i].pnn, TIMEOUT(),
3669                                                &remote_nodemap);
3670                 if (ret != 0) {
3671                         fprintf(stderr,
3672                                 "ERROR: Failed to get nodes file from node %u\n",
3673                                 nodemap->node[i].pnn);
3674                         return ret;
3675                 }
3676
3677                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3678                         fprintf(stderr,
3679                                 "ERROR: Nodes file on node %u differs"
3680                                  " from current node (%u)\n",
3681                                  nodemap->node[i].pnn, ctdb->pnn);
3682                         return 1;
3683                 }
3684         }
3685
3686         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3687         if (ret != 0) {
3688                 return ret;
3689         }
3690
3691         if (! reload) {
3692                 fprintf(stderr, "No change in nodes file,"
3693                                 " skipping unnecessary reload\n");
3694                 return 0;
3695         }
3696
3697         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3698                                         mem_ctx, &pnn_list);
3699         if (count <= 0) {
3700                 fprintf(stderr, "Memory allocation error\n");
3701                 return 1;
3702         }
3703
3704         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3705                                  pnn_list, count);
3706         if (ret != 0) {
3707                 fprintf(stderr, "Failed to disable recoveries\n");
3708                 return ret;
3709         }
3710
3711         ctdb_req_control_reload_nodes_file(&request);
3712         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3713                                         pnn_list, count, TIMEOUT(),
3714                                         &request, NULL, &reply);
3715         if (ret != 0) {
3716                 bool failed = false;
3717
3718                 for (i=0; i<count; i++) {
3719                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3720                         if (ret != 0) {
3721                                 fprintf(stderr,
3722                                         "Node %u failed to reload nodes\n",
3723                                         pnn_list[i]);
3724                                 failed = true;
3725                         }
3726                 }
3727                 if (failed) {
3728                         fprintf(stderr,
3729                                 "You MUST fix failed nodes manually!\n");
3730                 }
3731         }
3732
3733         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3734         if (ret != 0) {
3735                 fprintf(stderr, "Failed to enable recoveries\n");
3736                 return ret;
3737         }
3738
3739         return 0;
3740 }
3741
3742 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3743                   ctdb_sock_addr *addr, uint32_t pnn)
3744 {
3745         struct ctdb_public_ip_list *pubip_list;
3746         struct ctdb_public_ip pubip;
3747         struct ctdb_node_map *nodemap;
3748         struct ctdb_req_control request;
3749         uint32_t *pnn_list;
3750         int ret, i, count;
3751
3752         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3753                                             CTDB_BROADCAST_CONNECTED,
3754                                             2*options.timelimit);
3755         if (ret != 0) {
3756                 fprintf(stderr, "Failed to disable IP check\n");
3757                 return ret;
3758         }
3759
3760         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3761                                        pnn, TIMEOUT(), &pubip_list);
3762         if (ret != 0) {
3763                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3764                         pnn);
3765                 return ret;
3766         }
3767
3768         for (i=0; i<pubip_list->num; i++) {
3769                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3770                         break;
3771                 }
3772         }
3773
3774         if (i == pubip_list->num) {
3775                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3776                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3777                 return 1;
3778         }
3779
3780         nodemap = get_nodemap(ctdb, false);
3781         if (nodemap == NULL) {
3782                 return 1;
3783         }
3784
3785         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3786         if (count <= 0) {
3787                 fprintf(stderr, "Memory allocation error\n");
3788                 return 1;
3789         }
3790
3791         pubip.pnn = pnn;
3792         pubip.addr = *addr;
3793         ctdb_req_control_release_ip(&request, &pubip);
3794
3795         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3796                                         pnn_list, count, TIMEOUT(),
3797                                         &request, NULL, NULL);
3798         if (ret != 0) {
3799                 fprintf(stderr, "Failed to release IP on nodes\n");
3800                 return ret;
3801         }
3802
3803         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3804                                     pnn, TIMEOUT(), &pubip);
3805         if (ret != 0) {
3806                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3807                 return ret;
3808         }
3809
3810         return 0;
3811 }
3812
3813 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3814                           int argc, const char **argv)
3815 {
3816         ctdb_sock_addr addr;
3817         uint32_t pnn;
3818         int ret, retries = 0;
3819
3820         if (argc != 2) {
3821                 usage("moveip");
3822         }
3823
3824         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3825                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3826                 return 1;
3827         }
3828
3829         pnn = strtoul(argv[1], NULL, 10);
3830         if (pnn == CTDB_UNKNOWN_PNN) {
3831                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3832                 return 1;
3833         }
3834
3835         while (retries < 5) {
3836                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3837                 if (ret == 0) {
3838                         break;
3839                 }
3840
3841                 sleep(3);
3842                 retries++;
3843         }
3844
3845         if (ret != 0) {
3846                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3847                         argv[0], pnn);
3848                 return ret;
3849         }
3850
3851         return 0;
3852 }
3853
3854 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3855                          uint32_t pnn)
3856 {
3857         int ret;
3858
3859         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3860                                           CTDB_BROADCAST_CONNECTED, pnn);
3861         if (ret != 0) {
3862                 fprintf(stderr,
3863                         "Failed to ask recovery master to distribute IPs\n");
3864                 return ret;
3865         }
3866
3867         return 0;
3868 }
3869
3870 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3871                          int argc, const char **argv)
3872 {
3873         ctdb_sock_addr addr;
3874         struct ctdb_public_ip_list *pubip_list;
3875         struct ctdb_addr_info addr_info;
3876         unsigned int mask;
3877         int ret, i, retries = 0;
3878
3879         if (argc != 2) {
3880                 usage("addip");
3881         }
3882
3883         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3884                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3885                 return 1;
3886         }
3887
3888         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3889                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3890         if (ret != 0) {
3891                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3892                         ctdb->cmd_pnn);
3893                 return 1;
3894         }
3895
3896         for (i=0; i<pubip_list->num; i++) {
3897                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3898                         fprintf(stderr, "Node already knows about IP %s\n",
3899                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3900                         return 0;
3901                 }
3902         }
3903
3904         addr_info.addr = addr;
3905         addr_info.mask = mask;
3906         addr_info.iface = argv[1];
3907
3908         while (retries < 5) {
3909                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3910                                               ctdb->cmd_pnn, TIMEOUT(),
3911                                               &addr_info);
3912                 if (ret == 0) {
3913                         break;
3914                 }
3915
3916                 sleep(3);
3917                 retries++;
3918         }
3919
3920         if (ret != 0) {
3921                 fprintf(stderr, "Failed to add public IP to node %u."
3922                                 " Giving up\n", ctdb->cmd_pnn);
3923                 return ret;
3924         }
3925
3926         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3927         if (ret != 0) {
3928                 return ret;
3929         }
3930
3931         return 0;
3932 }
3933
3934 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3935                          int argc, const char **argv)
3936 {
3937         ctdb_sock_addr addr;
3938         struct ctdb_public_ip_list *pubip_list;
3939         struct ctdb_addr_info addr_info;
3940         int ret, i;
3941
3942         if (argc != 1) {
3943                 usage("delip");
3944         }
3945
3946         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3947                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3948                 return 1;
3949         }
3950
3951         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3952                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3953         if (ret != 0) {
3954                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3955                         ctdb->cmd_pnn);
3956                 return 1;
3957         }
3958
3959         for (i=0; i<pubip_list->num; i++) {
3960                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3961                         break;
3962                 }
3963         }
3964
3965         if (i == pubip_list->num) {
3966                 fprintf(stderr, "Node does not know about IP address %s\n",
3967                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3968                 return 0;
3969         }
3970
3971         addr_info.addr = addr;
3972         addr_info.mask = 0;
3973         addr_info.iface = NULL;
3974
3975         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3976                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3977         if (ret != 0) {
3978                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3979                         ctdb->cmd_pnn);
3980                 return ret;
3981         }
3982
3983         return 0;
3984 }
3985
3986 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3987                                int argc, const char **argv)
3988 {
3989         int ret;
3990
3991         if (argc != 1) {
3992                 usage("eventscript");
3993         }
3994
3995         if (strcmp(argv[0], "monitor") != 0) {
3996                 fprintf(stderr, "Only monitor event can be run\n");
3997                 return 1;
3998         }
3999
4000         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
4001                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4002         if (ret != 0) {
4003                 fprintf(stderr, "Failed to run monitor event on node %u\n",
4004                         ctdb->cmd_pnn);
4005                 return ret;
4006         }
4007
4008         return 0;
4009 }
4010
4011 #define DB_VERSION      3
4012 #define MAX_DB_NAME     64
4013 #define MAX_REC_BUFFER_SIZE     (100*1000)
4014
4015 struct db_header {
4016         unsigned long version;
4017         time_t timestamp;
4018         unsigned long flags;
4019         unsigned long nbuf;
4020         unsigned long nrec;
4021         char name[MAX_DB_NAME];
4022 };
4023
4024 struct backup_state {
4025         TALLOC_CTX *mem_ctx;
4026         struct ctdb_rec_buffer *recbuf;
4027         uint32_t db_id;
4028         int fd;
4029         unsigned int nbuf, nrec;
4030 };
4031
4032 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4033                           TDB_DATA key, TDB_DATA data, void *private_data)
4034 {
4035         struct backup_state *state = (struct backup_state *)private_data;
4036         size_t len;
4037         int ret;
4038
4039         if (state->recbuf == NULL) {
4040                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4041                                                      state->db_id);
4042                 if (state->recbuf == NULL) {
4043                         return ENOMEM;
4044                 }
4045         }
4046
4047         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4048                                   header, key, data);
4049         if (ret != 0) {
4050                 return ret;
4051         }
4052
4053         len = ctdb_rec_buffer_len(state->recbuf);
4054         if (len < MAX_REC_BUFFER_SIZE) {
4055                 return 0;
4056         }
4057
4058         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4059         if (ret != 0) {
4060                 fprintf(stderr, "Failed to write records to backup file\n");
4061                 return ret;
4062         }
4063
4064         state->nbuf += 1;
4065         state->nrec += state->recbuf->count;
4066         TALLOC_FREE(state->recbuf);
4067
4068         return 0;
4069 }
4070
4071 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4072                             int argc, const char **argv)
4073 {
4074         const char *db_name;
4075         struct ctdb_db_context *db;
4076         uint32_t db_id;
4077         uint8_t db_flags;
4078         struct backup_state state;
4079         struct db_header db_hdr;
4080         int fd, ret;
4081
4082         if (argc != 2) {
4083                 usage("backupdb");
4084         }
4085
4086         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4087                 return 1;
4088         }
4089
4090         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4091                           db_flags, &db);
4092         if (ret != 0) {
4093                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4094                 return ret;
4095         }
4096
4097         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4098         if (fd == -1) {
4099                 ret = errno;
4100                 fprintf(stderr, "Failed to open file %s for writing\n",
4101                         argv[1]);
4102                 return ret;
4103         }
4104
4105         /* Write empty header first */
4106         ZERO_STRUCT(db_hdr);
4107         ret = write(fd, &db_hdr, sizeof(struct db_header));
4108         if (ret == -1) {
4109                 ret = errno;
4110                 close(fd);
4111                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4112                 return ret;
4113         }
4114
4115         state.mem_ctx = mem_ctx;
4116         state.recbuf = NULL;
4117         state.fd = fd;
4118         state.nbuf = 0;
4119         state.nrec = 0;
4120
4121         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4122         if (ret != 0) {
4123                 fprintf(stderr, "Failed to collect records from DB %s\n",
4124                         db_name);
4125                 close(fd);
4126                 return ret;
4127         }
4128
4129         if (state.recbuf != NULL) {
4130                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4131                 if (ret != 0) {
4132                         fprintf(stderr,
4133                                 "Failed to write records to backup file\n");
4134                         close(fd);
4135                         return ret;
4136                 }
4137
4138                 state.nbuf += 1;
4139                 state.nrec += state.recbuf->count;
4140                 TALLOC_FREE(state.recbuf);
4141         }
4142
4143         db_hdr.version = DB_VERSION;
4144         db_hdr.timestamp = time(NULL);
4145         db_hdr.flags = db_flags;
4146         db_hdr.nbuf = state.nbuf;
4147         db_hdr.nrec = state.nrec;
4148         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4149
4150         lseek(fd, 0, SEEK_SET);
4151         ret = write(fd, &db_hdr, sizeof(struct db_header));
4152         if (ret == -1) {
4153                 ret = errno;
4154                 close(fd);
4155                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4156                 return ret;
4157         }
4158
4159         close(fd);
4160         printf("Database backed up to %s\n", argv[1]);
4161         return 0;
4162 }
4163
4164 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4165                              int argc, const char **argv)
4166 {
4167         const char *db_name = NULL;
4168         struct ctdb_db_context *db;
4169         struct db_header db_hdr;
4170         struct ctdb_node_map *nodemap;
4171         struct ctdb_req_control request;
4172         struct ctdb_reply_control **reply;
4173         struct ctdb_transdb wipedb;
4174         struct ctdb_pulldb_ext pulldb;
4175         struct ctdb_rec_buffer *recbuf;
4176         uint32_t generation;
4177         uint32_t *pnn_list;
4178         char timebuf[128];
4179         int fd, i;
4180         int count, ret;
4181
4182         if (argc < 1 || argc > 2) {
4183                 usage("restoredb");
4184         }
4185
4186         fd = open(argv[0], O_RDONLY, 0600);
4187         if (fd == -1) {
4188                 ret = errno;
4189                 fprintf(stderr, "Failed to open file %s for reading\n",
4190                         argv[0]);
4191                 return ret;
4192         }
4193
4194         if (argc == 2) {
4195                 db_name = argv[1];
4196         }
4197
4198         ret = read(fd, &db_hdr, sizeof(struct db_header));
4199         if (ret == -1) {
4200                 ret = errno;
4201                 close(fd);
4202                 fprintf(stderr, "Failed to read db header from file %s\n",
4203                         argv[0]);
4204                 return ret;
4205         }
4206         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4207
4208         if (db_hdr.version != DB_VERSION) {
4209                 fprintf(stderr,
4210                         "Wrong version of backup file, expected %u, got %lu\n",
4211                         DB_VERSION, db_hdr.version);
4212                 close(fd);
4213                 return EINVAL;
4214         }
4215
4216         if (db_name == NULL) {
4217                 db_name = db_hdr.name;
4218         }
4219
4220         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4221                  localtime(&db_hdr.timestamp));
4222         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4223
4224         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4225                           db_hdr.flags, &db);
4226         if (ret != 0) {
4227                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4228                 close(fd);
4229                 return ret;
4230         }
4231
4232         nodemap = get_nodemap(ctdb, false);
4233         if (nodemap == NULL) {
4234                 fprintf(stderr, "Failed to get nodemap\n");
4235                 close(fd);
4236                 return ENOMEM;
4237         }
4238
4239         ret = get_generation(mem_ctx, ctdb, &generation);
4240         if (ret != 0) {
4241                 fprintf(stderr, "Failed to get current generation\n");
4242                 close(fd);
4243                 return ret;
4244         }
4245
4246         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4247                                      &pnn_list);
4248         if (count <= 0) {
4249                 close(fd);
4250                 return ENOMEM;
4251         }
4252
4253         wipedb.db_id = ctdb_db_id(db);
4254         wipedb.tid = generation;
4255
4256         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4257         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4258                                         ctdb->client, pnn_list, count,
4259                                         TIMEOUT(), &request, NULL, NULL);
4260         if (ret != 0) {
4261                 goto failed;
4262         }
4263
4264
4265         ctdb_req_control_db_transaction_start(&request, &wipedb);
4266         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4267                                         pnn_list, count, TIMEOUT(),
4268                                         &request, NULL, NULL);
4269         if (ret != 0) {
4270                 goto failed;
4271         }
4272
4273         ctdb_req_control_wipe_database(&request, &wipedb);
4274         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4275                                         pnn_list, count, TIMEOUT(),
4276                                         &request, NULL, NULL);
4277         if (ret != 0) {
4278                 goto failed;
4279         }
4280
4281         pulldb.db_id = ctdb_db_id(db);
4282         pulldb.lmaster = 0;
4283         pulldb.srvid = SRVID_CTDB_PUSHDB;
4284
4285         ctdb_req_control_db_push_start(&request, &pulldb);
4286         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4287                                         pnn_list, count, TIMEOUT(),
4288                                         &request, NULL, NULL);
4289         if (ret != 0) {
4290                 goto failed;
4291         }
4292
4293         for (i=0; i<db_hdr.nbuf; i++) {
4294                 struct ctdb_req_message message;
4295                 TDB_DATA data;
4296
4297                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4298                 if (ret != 0) {
4299                         goto failed;
4300                 }
4301
4302                 data.dsize = ctdb_rec_buffer_len(recbuf);
4303                 data.dptr = talloc_size(mem_ctx, data.dsize);
4304                 if (data.dptr == NULL) {
4305                         goto failed;
4306                 }
4307
4308                 ctdb_rec_buffer_push(recbuf, data.dptr);
4309
4310                 message.srvid = pulldb.srvid;
4311                 message.data.data = data;
4312
4313                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4314                                                 ctdb->client,
4315                                                 pnn_list, count,
4316                                                 &message, NULL);
4317                 if (ret != 0) {
4318                         goto failed;
4319                 }
4320
4321                 talloc_free(recbuf);
4322                 talloc_free(data.dptr);
4323         }
4324
4325         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4326         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4327                                         pnn_list, count, TIMEOUT(),
4328                                         &request, NULL, &reply);
4329         if (ret != 0) {
4330                 goto failed;
4331         }
4332
4333         for (i=0; i<count; i++) {
4334                 uint32_t num_records;
4335
4336                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4337                                                          &num_records);
4338                 if (ret != 0) {
4339                         fprintf(stderr, "Invalid response from node %u\n",
4340                                 pnn_list[i]);
4341                         goto failed;
4342                 }
4343
4344                 if (num_records != db_hdr.nrec) {
4345                         fprintf(stderr, "Node %u received %u of %lu records\n",
4346                                 pnn_list[i], num_records, db_hdr.nrec);
4347                         goto failed;
4348                 }
4349         }
4350
4351         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4352         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4353                                         pnn_list, count, TIMEOUT(),
4354                                         &request, NULL, NULL);
4355         if (ret != 0) {
4356                 goto failed;
4357         }
4358
4359         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4360         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4361                                         pnn_list, count, TIMEOUT(),
4362                                         &request, NULL, NULL);
4363         if (ret != 0) {
4364                 goto failed;
4365         }
4366
4367         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4368         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4369                                         ctdb->client, pnn_list, count,
4370                                         TIMEOUT(), &request, NULL, NULL);
4371         if (ret != 0) {
4372                 goto failed;
4373         }
4374
4375         printf("Database %s restored\n", db_name);
4376         return 0;
4377
4378
4379 failed:
4380         close(fd);
4381         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4382                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4383         return ret;
4384 }
4385
4386 struct dumpdbbackup_state {
4387         ctdb_rec_parser_func_t parser;
4388         struct dump_record_state sub_state;
4389 };
4390
4391 static int dumpdbbackup_handler(uint32_t reqid,
4392                                 struct ctdb_ltdb_header *header,
4393                                 TDB_DATA key, TDB_DATA data,
4394                                 void *private_data)
4395 {
4396         struct dumpdbbackup_state *state =
4397                 (struct dumpdbbackup_state *)private_data;
4398         struct ctdb_ltdb_header hdr;
4399         int ret;
4400
4401         ret = ctdb_ltdb_header_extract(&data, &hdr);
4402         if (ret != 0) {
4403                 return ret;
4404         }
4405
4406         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4407 }
4408
4409 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4410                                 int argc, const char **argv)
4411 {
4412         struct db_header db_hdr;
4413         char timebuf[128];
4414         struct dumpdbbackup_state state;
4415         int fd, ret, i;
4416
4417         if (argc != 1) {
4418                 usage("dumpbackup");
4419         }
4420
4421         fd = open(argv[0], O_RDONLY, 0600);
4422         if (fd == -1) {
4423                 ret = errno;
4424                 fprintf(stderr, "Failed to open file %s for reading\n",
4425                         argv[0]);
4426                 return ret;
4427         }
4428
4429         ret = read(fd, &db_hdr, sizeof(struct db_header));
4430         if (ret == -1) {
4431                 ret = errno;
4432                 close(fd);
4433                 fprintf(stderr, "Failed to read db header from file %s\n",
4434                         argv[0]);
4435                 return ret;
4436         }
4437         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4438
4439         if (db_hdr.version != DB_VERSION) {
4440                 fprintf(stderr,
4441                         "Wrong version of backup file, expected %u, got %lu\n",
4442                         DB_VERSION, db_hdr.version);
4443                 close(fd);
4444                 return EINVAL;
4445         }
4446
4447         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4448                  localtime(&db_hdr.timestamp));
4449         printf("Dumping database %s from backup @ %s\n",
4450                db_hdr.name, timebuf);
4451
4452         state.parser = dump_record;
4453         state.sub_state.count = 0;
4454
4455         for (i=0; i<db_hdr.nbuf; i++) {
4456                 struct ctdb_rec_buffer *recbuf;
4457
4458                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4459                 if (ret != 0) {
4460                         fprintf(stderr, "Failed to read records\n");
4461                         close(fd);
4462                         return ret;
4463                 }
4464
4465                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4466                                                &state);
4467                 if (ret != 0) {
4468                         fprintf(stderr, "Failed to dump records\n");
4469                         close(fd);
4470                         return ret;
4471                 }
4472         }
4473
4474         close(fd);
4475         printf("Dumped %u record(s)\n", state.sub_state.count);
4476         return 0;
4477 }
4478
4479 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4480                           int argc, const char **argv)
4481 {
4482         const char *db_name;
4483         struct ctdb_db_context *db;
4484         uint32_t db_id;
4485         uint8_t db_flags;
4486         struct ctdb_node_map *nodemap;
4487         struct ctdb_req_control request;
4488         struct ctdb_transdb wipedb;
4489         uint32_t generation;
4490         uint32_t *pnn_list;
4491         int count, ret;
4492
4493         if (argc != 1) {
4494                 usage("wipedb");
4495         }
4496
4497         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4498                 return 1;
4499         }
4500
4501         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4502                           db_flags, &db);
4503         if (ret != 0) {
4504                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4505                 return ret;
4506         }
4507
4508         nodemap = get_nodemap(ctdb, false);
4509         if (nodemap == NULL) {
4510                 fprintf(stderr, "Failed to get nodemap\n");
4511                 return ENOMEM;
4512         }
4513
4514         ret = get_generation(mem_ctx, ctdb, &generation);
4515         if (ret != 0) {
4516                 fprintf(stderr, "Failed to get current generation\n");
4517                 return ret;
4518         }
4519
4520         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4521                                      &pnn_list);
4522         if (count <= 0) {
4523                 return ENOMEM;
4524         }
4525
4526         ctdb_req_control_db_freeze(&request, db_id);
4527         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4528                                         ctdb->client, pnn_list, count,
4529                                         TIMEOUT(), &request, NULL, NULL);
4530         if (ret != 0) {
4531                 goto failed;
4532         }
4533
4534         wipedb.db_id = db_id;
4535         wipedb.tid = generation;
4536
4537         ctdb_req_control_db_transaction_start(&request, &wipedb);
4538         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4539                                         pnn_list, count, TIMEOUT(),
4540                                         &request, NULL, NULL);
4541         if (ret != 0) {
4542                 goto failed;
4543         }
4544
4545         ctdb_req_control_wipe_database(&request, &wipedb);
4546         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4547                                         pnn_list, count, TIMEOUT(),
4548                                         &request, NULL, NULL);
4549         if (ret != 0) {
4550                 goto failed;
4551         }
4552
4553         ctdb_req_control_db_set_healthy(&request, db_id);
4554         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4555                                         pnn_list, count, TIMEOUT(),
4556                                         &request, NULL, NULL);
4557         if (ret != 0) {
4558                 goto failed;
4559         }
4560
4561         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4562         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4563                                         pnn_list, count, TIMEOUT(),
4564                                         &request, NULL, NULL);
4565         if (ret != 0) {
4566                 goto failed;
4567         }
4568
4569         ctdb_req_control_db_thaw(&request, db_id);
4570         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4571                                         ctdb->client, pnn_list, count,
4572                                         TIMEOUT(), &request, NULL, NULL);
4573         if (ret != 0) {
4574                 goto failed;
4575         }
4576
4577         printf("Database %s wiped\n", db_name);
4578         return 0;
4579
4580
4581 failed:
4582         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4583                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4584         return ret;
4585 }
4586
4587 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4588                              int argc, const char **argv)
4589 {
4590         uint32_t recmaster;
4591         int ret;
4592
4593         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4594                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4595         if (ret != 0) {
4596                 return ret;
4597         }
4598
4599         printf("%u\n", recmaster);
4600         return 0;
4601 }
4602
4603 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4604                                    const char *event_str)
4605 {
4606         int i;
4607         int num_run = 0;
4608
4609         if (slist == NULL) {
4610                 if (! options.machinereadable) {
4611                         printf("%s cycle never run\n", event_str);
4612                 }
4613                 return;
4614         }
4615
4616         for (i=0; i<slist->num_scripts; i++) {
4617                 if (slist->script[i].status != -ENOEXEC) {
4618                         num_run++;
4619                 }
4620         }
4621
4622         if (! options.machinereadable) {
4623                 printf("%d scripts were executed last %s cycle\n",
4624                        num_run, event_str);
4625         }
4626
4627         for (i=0; i<slist->num_scripts; i++) {
4628                 const char *status = NULL;
4629
4630                 switch (slist->script[i].status) {
4631                 case -ETIME:
4632                         status = "TIMEDOUT";
4633                         break;
4634                 case -ENOEXEC:
4635                         status = "DISABLED";
4636                         break;
4637                 case 0:
4638                         status = "OK";
4639                         break;
4640                 default:
4641                         if (slist->script[i].status > 0) {
4642                                 status = "ERROR";
4643                         }
4644                         break;
4645                 }
4646
4647                 if (options.machinereadable) {
4648                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4649                                options.sep,
4650                                event_str, options.sep,
4651                                slist->script[i].name, options.sep,
4652                                slist->script[i].status, options.sep,
4653                                status, options.sep,
4654                                (unsigned long)slist->script[i].start.tv_sec,
4655                                slist->script[i].start.tv_usec, options.sep,
4656                                (unsigned long)slist->script[i].finished.tv_sec,
4657                                slist->script[i].finished.tv_usec, options.sep,
4658                                slist->script[i].output, options.sep);
4659                         continue;
4660                 }
4661
4662                 if (status) {
4663                         printf("%-20s Status:%s    ",
4664                                slist->script[i].name, status);
4665                 } else {
4666                         /* Some other error, eg from stat. */
4667                         printf("%-20s Status:CANNOT RUN (%s)",
4668                                slist->script[i].name,
4669                                strerror(-slist->script[i].status));
4670                 }
4671
4672                 if (slist->script[i].status >= 0) {
4673                         printf("Duration:%.3lf ",
4674                                timeval_delta(&slist->script[i].finished,
4675                                              &slist->script[i].start));
4676                 }
4677                 if (slist->script[i].status != -ENOEXEC) {
4678                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4679                         if (slist->script[i].status != 0) {
4680                                 printf("   OUTPUT:%s\n",
4681                                        slist->script[i].output);
4682                         }
4683                 } else {
4684                         printf("\n");
4685                 }
4686         }
4687 }
4688
4689 static void print_scriptstatus(struct ctdb_script_list **slist,
4690                                int count, const char **event_str)
4691 {
4692         int i;
4693
4694         if (options.machinereadable) {
4695                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4696                        options.sep,
4697                        "Type", options.sep,
4698                        "Name", options.sep,
4699                        "Code", options.sep,
4700                        "Status", options.sep,
4701                        "Start", options.sep,
4702                        "End", options.sep,
4703                        "Error Output", options.sep);
4704         }
4705
4706         for (i=0; i<count; i++) {
4707                 print_scriptstatus_one(slist[i], event_str[i]);
4708         }
4709 }
4710
4711 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4712                                 int argc, const char **argv)
4713 {
4714         struct ctdb_script_list **slist;
4715         const char *event_str;
4716         enum ctdb_event event;
4717         const char *all_events[] = {
4718                 "init", "setup", "startup", "monitor",
4719                 "takeip", "releaseip", "updateip", "ipreallocated" };
4720         bool valid;
4721         int ret, i, j;
4722         int count, start, end, num;
4723
4724         if (argc > 1) {
4725                 usage("scriptstatus");
4726         }
4727
4728         if (argc == 0) {
4729                 event_str = "monitor";
4730         } else {
4731                 event_str = argv[0];
4732         }
4733
4734         valid = false;
4735         end = 0;
4736
4737         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4738                 if (strcmp(event_str, all_events[i]) == 0) {
4739                         valid = true;
4740                         count = 1;
4741                         start = i;
4742                         end = i+1;
4743                         break;
4744                 }
4745         }
4746
4747         if (strcmp(event_str, "all") == 0) {
4748                 valid = true;
4749                 count = ARRAY_SIZE(all_events);
4750                 start = 0;
4751                 end = count-1;
4752         }
4753
4754         if (! valid) {
4755                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4756                 usage("scriptstatus");
4757         }
4758
4759         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4760         if (slist == NULL) {
4761                 fprintf(stderr, "Memory allocation error\n");
4762                 return 1;
4763         }
4764
4765         num = 0;
4766         for (i=start; i<end; i++) {
4767                 event = ctdb_event_from_string(all_events[i]);
4768
4769                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4770                                                         ctdb->client,
4771                                                         ctdb->cmd_pnn,
4772                                                         TIMEOUT(), event,
4773                                                         &slist[num]);
4774                 if (ret != 0) {
4775                         fprintf(stderr,
4776                                 "failed to get script status for %s event\n",
4777                                 all_events[i]);
4778                         return 1;
4779                 }
4780
4781                 if (slist[num] == NULL) {
4782                         num++;
4783                         continue;
4784                 }
4785
4786                 /* The ETIME status is ignored for certain events.
4787                  * In that case the status is 0, but endtime is not set.
4788                  */
4789                 for (j=0; j<slist[num]->num_scripts; j++) {
4790                         if (slist[num]->script[j].status == 0 &&
4791                             timeval_is_zero(&slist[num]->script[j].finished)) {
4792                                 slist[num]->script[j].status = -ETIME;
4793                         }
4794                 }
4795
4796                 num++;
4797         }
4798
4799         print_scriptstatus(slist, count, &all_events[start]);
4800         return 0;
4801 }
4802
4803 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4804                                 int argc, const char **argv)
4805 {
4806         int ret;
4807
4808         if (argc != 1) {
4809                 usage("enablescript");
4810         }
4811
4812         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4813                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4814         if (ret != 0) {
4815                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4816                 return ret;
4817         }
4818
4819         return 0;
4820 }
4821
4822 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4823                                  int argc, const char **argv)
4824 {
4825         int ret;
4826
4827         if (argc != 1) {
4828                 usage("disablescript");
4829         }
4830
4831         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4832                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4833         if (ret != 0) {
4834                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4835                 return ret;
4836         }
4837
4838         return 0;
4839 }
4840
4841 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4842                          int argc, const char **argv)
4843 {
4844         char *t, *natgw_helper = NULL;
4845
4846         if (argc != 1) {
4847                 usage("natgw");
4848         }
4849
4850         t = getenv("CTDB_NATGW_HELPER");
4851         if (t != NULL) {
4852                 natgw_helper = talloc_strdup(mem_ctx, t);
4853         } else {
4854                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4855                                                CTDB_HELPER_BINDIR);
4856         }
4857
4858         if (natgw_helper == NULL) {
4859                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4860                 return 1;
4861         }
4862
4863         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4864 }
4865
4866 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4867                              int argc, const char **argv)
4868 {
4869         char *t, *natgw_helper = NULL;
4870
4871         if (argc != 0) {
4872                 usage("natgwlist");
4873         }
4874
4875         t = getenv("CTDB_NATGW_HELPER");
4876         if (t != NULL) {
4877                 natgw_helper = talloc_strdup(mem_ctx, t);
4878         } else {
4879                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4880                                                CTDB_HELPER_BINDIR);
4881         }
4882
4883         if (natgw_helper == NULL) {
4884                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4885                 return 1;
4886         }
4887
4888         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4889 }
4890
4891 /*
4892  * Find the PNN of the current node
4893  * discover the pnn by loading the nodes file and try to bind
4894  * to all addresses one at a time until the ip address is found.
4895  */
4896 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4897 {
4898         struct ctdb_node_map *nodemap;
4899         int i;
4900
4901         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4902         if (nodemap == NULL) {
4903                 return false;
4904         }
4905
4906         for (i=0; i<nodemap->num; i++) {
4907                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4908                         continue;
4909                 }
4910                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4911                         if (pnn != NULL) {
4912                                 *pnn = nodemap->node[i].pnn;
4913                         }
4914                         talloc_free(nodemap);
4915                         return true;
4916                 }
4917         }
4918
4919         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4920         talloc_free(nodemap);
4921         return false;
4922 }
4923
4924 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4925                               int argc, const char **argv)
4926 {
4927         const char *reclock;
4928         int ret;
4929
4930         if (argc != 0) {
4931                 usage("getreclock");
4932         }
4933
4934         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4935                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4936         if (ret != 0) {
4937                 return ret;
4938         }
4939
4940         if (reclock != NULL) {
4941                 printf("%s\n", reclock);
4942         }
4943
4944         return 0;
4945 }
4946
4947 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4948                                   struct ctdb_context *ctdb,
4949                                   int argc, const char **argv)
4950 {
4951         uint32_t lmasterrole = 0;
4952         int ret;
4953
4954         if (argc != 1) {
4955                 usage("setlmasterrole");
4956         }
4957
4958         if (strcmp(argv[0], "on") == 0) {
4959                 lmasterrole = 1;
4960         } else if (strcmp(argv[0], "off") == 0) {
4961                 lmasterrole = 0;
4962         } else {
4963                 usage("setlmasterrole");
4964         }
4965
4966         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4967                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4968         if (ret != 0) {
4969                 return ret;
4970         }
4971
4972         return 0;
4973 }
4974
4975 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4976                                     struct ctdb_context *ctdb,
4977                                     int argc, const char **argv)
4978 {
4979         uint32_t recmasterrole = 0;
4980         int ret;
4981
4982         if (argc != 1) {
4983                 usage("setrecmasterrole");
4984         }
4985
4986         if (strcmp(argv[0], "on") == 0) {
4987                 recmasterrole = 1;
4988         } else if (strcmp(argv[0], "off") == 0) {
4989                 recmasterrole = 0;
4990         } else {
4991                 usage("setrecmasterrole");
4992         }
4993
4994         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4995                                           ctdb->cmd_pnn, TIMEOUT(),
4996                                           recmasterrole);
4997         if (ret != 0) {
4998                 return ret;
4999         }
5000
5001         return 0;
5002 }
5003
5004 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
5005                                  struct ctdb_context *ctdb,
5006                                  int argc, const char **argv)
5007 {
5008         uint32_t db_id;
5009         uint8_t db_flags;
5010         int ret;
5011
5012         if (argc != 1) {
5013                 usage("setdbreadonly");
5014         }
5015
5016         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5017                 return 1;
5018         }
5019
5020         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5021                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
5022                 return 1;
5023         }
5024
5025         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5026                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5027         if (ret != 0) {
5028                 return ret;
5029         }
5030
5031         return 0;
5032 }
5033
5034 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5035                                int argc, const char **argv)
5036 {
5037         uint32_t db_id;
5038         uint8_t db_flags;
5039         int ret;
5040
5041         if (argc != 1) {
5042                 usage("setdbsticky");
5043         }
5044
5045         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5046                 return 1;
5047         }
5048
5049         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5050                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5051                 return 1;
5052         }
5053
5054         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5055                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5056         if (ret != 0) {
5057                 return ret;
5058         }
5059
5060         return 0;
5061 }
5062
5063 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5064                           int argc, const char **argv)
5065 {
5066         const char *db_name;
5067         struct ctdb_db_context *db;
5068         struct ctdb_transaction_handle *h;
5069         uint8_t db_flags;
5070         TDB_DATA key, data;
5071         int ret;
5072
5073         if (argc < 2 || argc > 3) {
5074                 usage("pfetch");
5075         }
5076
5077         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5078                 return 1;
5079         }
5080
5081         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5082                 fprintf(stderr, "DB %s is not a persistent database\n",
5083                         db_name);
5084                 return 1;
5085         }
5086
5087         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5088                           db_flags, &db);
5089         if (ret != 0) {
5090                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5091                 return ret;
5092         }
5093
5094         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5095         if (ret != 0) {
5096                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5097                 return ret;
5098         }
5099
5100         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5101                                      TIMEOUT(), db, true, &h);
5102         if (ret != 0) {
5103                 fprintf(stderr, "Failed to start transaction on db %s\n",
5104                         db_name);
5105                 return ret;
5106         }
5107
5108         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5109         if (ret != 0) {
5110                 fprintf(stderr, "Failed to read record for key %s\n",
5111                         argv[1]);
5112                 ctdb_transaction_cancel(h);
5113                 return ret;
5114         }
5115
5116         printf("%.*s\n", (int)data.dsize, data.dptr);
5117
5118         ctdb_transaction_cancel(h);
5119         return 0;
5120 }
5121
5122 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5123                           int argc, const char **argv)
5124 {
5125         const char *db_name;
5126         struct ctdb_db_context *db;
5127         struct ctdb_transaction_handle *h;
5128         uint8_t db_flags;
5129         TDB_DATA key, data;
5130         int ret;
5131
5132         if (argc != 3) {
5133                 usage("pstore");
5134         }
5135
5136         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5137                 return 1;
5138         }
5139
5140         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5141                 fprintf(stderr, "DB %s is not a persistent database\n",
5142                         db_name);
5143                 return 1;
5144         }
5145
5146         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5147                           db_flags, &db);
5148         if (ret != 0) {
5149                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5150                 return ret;
5151         }
5152
5153         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5154         if (ret != 0) {
5155                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5156                 return ret;
5157         }
5158
5159         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5160         if (ret != 0) {
5161                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5162                 return ret;
5163         }
5164
5165         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5166                                      TIMEOUT(), db, false, &h);
5167         if (ret != 0) {
5168                 fprintf(stderr, "Failed to start transaction on db %s\n",
5169                         db_name);
5170                 return ret;
5171         }
5172
5173         ret = ctdb_transaction_store_record(h, key, data);
5174         if (ret != 0) {
5175                 fprintf(stderr, "Failed to store record for key %s\n",
5176                         argv[1]);
5177                 ctdb_transaction_cancel(h);
5178                 return ret;
5179         }
5180
5181         ret = ctdb_transaction_commit(h);
5182         if (ret != 0) {
5183                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5184                         db_name);
5185                 ctdb_transaction_cancel(h);
5186                 return ret;
5187         }
5188
5189         return 0;
5190 }
5191
5192 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5193                            int argc, const char **argv)
5194 {
5195         const char *db_name;
5196         struct ctdb_db_context *db;
5197         struct ctdb_transaction_handle *h;
5198         uint8_t db_flags;
5199         TDB_DATA key;
5200         int ret;
5201
5202         if (argc != 2) {
5203                 usage("pdelete");
5204         }
5205
5206         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5207                 return 1;
5208         }
5209
5210         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5211                 fprintf(stderr, "DB %s is not a persistent database\n",
5212                         db_name);
5213                 return 1;
5214         }
5215
5216         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5217                           db_flags, &db);
5218         if (ret != 0) {
5219                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5220                 return ret;
5221         }
5222
5223         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5224         if (ret != 0) {
5225                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5226                 return ret;
5227         }
5228
5229         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5230                                      TIMEOUT(), db, false, &h);
5231         if (ret != 0) {
5232                 fprintf(stderr, "Failed to start transaction on db %s\n",
5233                         db_name);
5234                 return ret;
5235         }
5236
5237         ret = ctdb_transaction_delete_record(h, key);
5238         if (ret != 0) {
5239                 fprintf(stderr, "Failed to delete record for key %s\n",
5240                         argv[1]);
5241                 ctdb_transaction_cancel(h);
5242                 return ret;
5243         }
5244
5245         ret = ctdb_transaction_commit(h);
5246         if (ret != 0) {
5247                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5248                         db_name);
5249                 ctdb_transaction_cancel(h);
5250                 return ret;
5251         }
5252
5253         return 0;
5254 }
5255
5256 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5257 {
5258         const char *t;
5259         size_t n;
5260         int ret;
5261
5262         *data = tdb_null;
5263
5264         /* Skip whitespace */
5265         n = strspn(*ptr, " \t");
5266         t = *ptr + n;
5267
5268         if (t[0] == '"') {
5269                 /* Quoted ASCII string - no wide characters! */
5270                 t++;
5271                 n = strcspn(t, "\"");
5272                 if (t[n] == '"') {
5273                         if (n > 0) {
5274                                 ret = str_to_data(t, n, mem_ctx, data);
5275                                 if (ret != 0) {
5276                                         return ret;
5277                                 }
5278                         }
5279                         *ptr = t + n + 1;
5280                 } else {
5281                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5282                         return 1;
5283                 }
5284         } else {
5285                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5286                 return 1;
5287         }
5288
5289         return 0;
5290 }
5291
5292 #define MAX_LINE_SIZE   1024
5293
5294 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5295                                  TDB_DATA *key, TDB_DATA *value)
5296 {
5297         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5298         const char *ptr;
5299         int ret;
5300
5301         ptr = fgets(line, MAX_LINE_SIZE, file);
5302         if (ptr == NULL) {
5303                 return false;
5304         }
5305
5306         /* Get key */
5307         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5308         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5309                 /* Line Ignored but not EOF */
5310                 *key = tdb_null;
5311                 return true;
5312         }
5313
5314         /* Get value */
5315         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5316         if (ret != 0) {
5317                 /* Line Ignored but not EOF */
5318                 talloc_free(key->dptr);
5319                 *key = tdb_null;
5320                 return true;
5321         }
5322
5323         return true;
5324 }
5325
5326 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5327                           int argc, const char **argv)
5328 {
5329         const char *db_name;
5330         struct ctdb_db_context *db;
5331         struct ctdb_transaction_handle *h;
5332         uint8_t db_flags;
5333         FILE *file;
5334         TDB_DATA key, value;
5335         int ret;
5336
5337         if (argc < 1 || argc > 2) {
5338                 usage("ptrans");
5339         }
5340
5341         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5342                 return 1;
5343         }
5344
5345         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5346                 fprintf(stderr, "DB %s is not a persistent database\n",
5347                         db_name);
5348                 return 1;
5349         }
5350
5351         if (argc == 2) {
5352                 file = fopen(argv[1], "r");
5353                 if (file == NULL) {
5354                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5355                         return 1;
5356                 }
5357         } else {
5358                 file = stdin;
5359         }
5360
5361         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5362                           db_flags, &db);
5363         if (ret != 0) {
5364                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5365                 goto done;
5366         }
5367
5368         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5369                                      TIMEOUT(), db, false, &h);
5370         if (ret != 0) {
5371                 fprintf(stderr, "Failed to start transaction on db %s\n",
5372                         db_name);
5373                 goto done;
5374         }
5375
5376         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5377                 if (key.dsize != 0) {
5378                         ret = ctdb_transaction_store_record(h, key, value);
5379                         if (ret != 0) {
5380                                 fprintf(stderr, "Failed to store record\n");
5381                                 ctdb_transaction_cancel(h);
5382                                 goto done;
5383                         }
5384                         talloc_free(key.dptr);
5385                         talloc_free(value.dptr);
5386                 }
5387         }
5388
5389         ret = ctdb_transaction_commit(h);
5390         if (ret != 0) {
5391                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5392                         db_name);
5393                 ctdb_transaction_cancel(h);
5394         }
5395
5396 done:
5397         if (file != stdin) {
5398                 fclose(file);
5399         }
5400         return ret;
5401 }
5402
5403 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5404                           int argc, const char **argv)
5405 {
5406         struct tdb_context *tdb;
5407         TDB_DATA key, data;
5408         struct ctdb_ltdb_header header;
5409         int ret;
5410
5411         if (argc < 2 || argc > 3) {
5412                 usage("tfetch");
5413         }
5414
5415         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5416         if (tdb == NULL) {
5417                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5418                 return 1;
5419         }
5420
5421         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5422         if (ret != 0) {
5423                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5424                 tdb_close(tdb);
5425                 return ret;
5426         }
5427
5428         data = tdb_fetch(tdb, key);
5429         if (data.dptr == NULL) {
5430                 fprintf(stderr, "No record for key %s\n", argv[1]);
5431                 tdb_close(tdb);
5432                 return 1;
5433         }
5434
5435         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5436                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5437                 tdb_close(tdb);
5438                 return 1;
5439         }
5440
5441         tdb_close(tdb);
5442
5443         if (argc == 3) {
5444                 int fd;
5445                 ssize_t nwritten;
5446
5447                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5448                 if (fd == -1) {
5449                         fprintf(stderr, "Failed to open output file %s\n",
5450                                 argv[2]);
5451                         goto fail;
5452                 }
5453
5454                 nwritten = sys_write(fd, data.dptr, data.dsize);
5455                 if (nwritten != data.dsize) {
5456                         fprintf(stderr, "Failed to write record to file\n");
5457                         close(fd);
5458                         goto fail;
5459                 }
5460
5461                 close(fd);
5462         }
5463
5464 fail:
5465         ret = ctdb_ltdb_header_extract(&data, &header);
5466         if (ret != 0) {
5467                 fprintf(stderr, "Failed to parse header from data\n");
5468                 return 1;
5469         }
5470
5471         dump_ltdb_header(&header);
5472         dump_tdb_data("data", data);
5473
5474         return 0;
5475 }
5476
5477 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5478                           int argc, const char **argv)
5479 {
5480         struct tdb_context *tdb;
5481         TDB_DATA key, data[2], value;
5482         struct ctdb_ltdb_header header;
5483         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5484         int ret;
5485
5486         if (argc < 3 || argc > 5) {
5487                 usage("tstore");
5488         }
5489
5490         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5491         if (tdb == NULL) {
5492                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5493                 return 1;
5494         }
5495
5496         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5497         if (ret != 0) {
5498                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5499                 tdb_close(tdb);
5500                 return ret;
5501         }
5502
5503         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5504         if (ret != 0) {
5505                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5506                 tdb_close(tdb);
5507                 return ret;
5508         }
5509
5510         ZERO_STRUCT(header);
5511
5512         if (argc > 3) {
5513                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5514         }
5515         if (argc > 4) {
5516                 header.dmaster = (uint32_t)atol(argv[4]);
5517         }
5518         if (argc > 5) {
5519                 header.flags = (uint32_t)atol(argv[5]);
5520         }
5521
5522         ctdb_ltdb_header_push(&header, header_buf);
5523
5524         data[0].dsize = ctdb_ltdb_header_len(&header);
5525         data[0].dptr = header_buf;
5526
5527         data[1].dsize = value.dsize;
5528         data[1].dptr = value.dptr;
5529
5530         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5531         if (ret != 0) {
5532                 fprintf(stderr, "Failed to write record %s to file %s\n",
5533                         argv[1], argv[0]);
5534         }
5535
5536         tdb_close(tdb);
5537
5538         return ret;
5539 }
5540
5541 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5542                            int argc, const char **argv)
5543 {
5544         const char *db_name;
5545         struct ctdb_db_context *db;
5546         struct ctdb_record_handle *h;
5547         uint8_t db_flags;
5548         TDB_DATA key, data;
5549         bool readonly = false;
5550         int ret;
5551
5552         if (argc < 2 || argc > 3) {
5553                 usage("readkey");
5554         }
5555
5556         if (argc == 3) {
5557                 if (strcmp(argv[2], "readonly") == 0) {
5558                         readonly = true;
5559                 } else {
5560                         usage("readkey");
5561                 }
5562         }
5563
5564         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5565                 return 1;
5566         }
5567
5568         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5569                 fprintf(stderr, "DB %s is not a volatile database\n",
5570                         db_name);
5571                 return 1;
5572         }
5573
5574         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5575                           db_flags, &db);
5576         if (ret != 0) {
5577                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5578                 return ret;
5579         }
5580
5581         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5582         if (ret != 0) {
5583                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5584                 return ret;
5585         }
5586
5587         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5588                               db, key, readonly, &h, NULL, &data);
5589         if (ret != 0) {
5590                 fprintf(stderr, "Failed to read record for key %s\n",
5591                         argv[1]);
5592         } else {
5593                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5594                        (int)data.dsize, data.dptr);
5595         }
5596
5597         talloc_free(h);
5598         return ret;
5599 }
5600
5601 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5602                             int argc, const char **argv)
5603 {
5604         const char *db_name;
5605         struct ctdb_db_context *db;
5606         struct ctdb_record_handle *h;
5607         uint8_t db_flags;
5608         TDB_DATA key, data;
5609         int ret;
5610
5611         if (argc != 3) {
5612                 usage("writekey");
5613         }
5614
5615         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5616                 return 1;
5617         }
5618
5619         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5620                 fprintf(stderr, "DB %s is not a volatile database\n",
5621                         db_name);
5622                 return 1;
5623         }
5624
5625         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5626                           db_flags, &db);
5627         if (ret != 0) {
5628                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5629                 return ret;
5630         }
5631
5632         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5633         if (ret != 0) {
5634                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5635                 return ret;
5636         }
5637
5638         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5639         if (ret != 0) {
5640                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5641                 return ret;
5642         }
5643
5644         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5645                               db, key, false, &h, NULL, NULL);
5646         if (ret != 0) {
5647                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5648                 return ret;
5649         }
5650
5651         ret = ctdb_store_record(h, data);
5652         if (ret != 0) {
5653                 fprintf(stderr, "Failed to store record for key %s\n",
5654                         argv[1]);
5655         }
5656
5657         talloc_free(h);
5658         return ret;
5659 }
5660
5661 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5662                              int argc, const char **argv)
5663 {
5664         const char *db_name;
5665         struct ctdb_db_context *db;
5666         struct ctdb_record_handle *h;
5667         uint8_t db_flags;
5668         TDB_DATA key, data;
5669         int ret;
5670
5671         if (argc != 2) {
5672                 usage("deletekey");
5673         }
5674
5675         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5676                 return 1;
5677         }
5678
5679         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5680                 fprintf(stderr, "DB %s is not a volatile database\n",
5681                         db_name);
5682                 return 1;
5683         }
5684
5685         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5686                           db_flags, &db);
5687         if (ret != 0) {
5688                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5689                 return ret;
5690         }
5691
5692         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5693         if (ret != 0) {
5694                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5695                 return ret;
5696         }
5697
5698         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5699                               db, key, false, &h, NULL, &data);
5700         if (ret != 0) {
5701                 fprintf(stderr, "Failed to fetch record for key %s\n",
5702                         argv[1]);
5703                 return ret;
5704         }
5705
5706         ret = ctdb_delete_record(h);
5707         if (ret != 0) {
5708                 fprintf(stderr, "Failed to delete record for key %s\n",
5709                         argv[1]);
5710         }
5711
5712         talloc_free(h);
5713         return ret;
5714 }
5715
5716 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5717                                 int argc, const char **argv)
5718 {
5719         struct sockaddr_in sin;
5720         unsigned int port;
5721         int s, v;
5722         int ret;
5723
5724         if (argc != 1) {
5725                 usage("chktcpport");
5726         }
5727
5728         port = atoi(argv[0]);
5729
5730         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5731         if (s == -1) {
5732                 fprintf(stderr, "Failed to open local socket\n");
5733                 return errno;
5734         }
5735
5736         v = fcntl(s, F_GETFL, 0);
5737         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5738                 fprintf(stderr, "Unable to set socket non-blocking\n");
5739                 close(s);
5740                 return errno;
5741         }
5742
5743         bzero(&sin, sizeof(sin));
5744         sin.sin_family = AF_INET;
5745         sin.sin_port = htons(port);
5746         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5747         close(s);
5748         if (ret == -1) {
5749                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5750                 return errno;
5751         }
5752
5753         return 0;
5754 }
5755
5756 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5757                                int argc, const char **argv)
5758 {
5759         uint32_t db_id;
5760         const char *db_name;
5761         uint64_t seqnum;
5762         int ret;
5763
5764         if (argc != 1) {
5765                 usage("getdbseqnum");
5766         }
5767
5768         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5769                 return 1;
5770         }
5771
5772         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5773                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5774                                       &seqnum);
5775         if (ret != 0) {
5776                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5777                         db_name);
5778                 return ret;
5779         }
5780
5781         printf("0x%"PRIx64"\n", seqnum);
5782         return 0;
5783 }
5784
5785 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5786                               int argc, const char **argv)
5787 {
5788         const char *nodestring = NULL;
5789         struct ctdb_node_map *nodemap;
5790         int ret, i;
5791
5792         if (argc > 1) {
5793                 usage("nodestatus");
5794         }
5795
5796         if (argc == 1) {
5797                 nodestring = argv[0];
5798         }
5799
5800         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5801                 return 1;
5802         }
5803
5804         nodemap = get_nodemap(ctdb, false);
5805         if (nodemap == NULL) {
5806                 return 1;
5807         }
5808
5809         if (options.machinereadable) {
5810                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5811         } else {
5812                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5813         }
5814
5815         ret = 0;
5816         for (i=0; i<nodemap->num; i++) {
5817                 ret |= nodemap->node[i].flags;
5818         }
5819
5820         return ret;
5821 }
5822
5823 const struct {
5824         const char *name;
5825         uint32_t offset;
5826 } db_stats_fields[] = {
5827 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5828         DBSTATISTICS_FIELD(db_ro_delegations),
5829         DBSTATISTICS_FIELD(db_ro_revokes),
5830         DBSTATISTICS_FIELD(locks.num_calls),
5831         DBSTATISTICS_FIELD(locks.num_current),
5832         DBSTATISTICS_FIELD(locks.num_pending),
5833         DBSTATISTICS_FIELD(locks.num_failed),
5834         DBSTATISTICS_FIELD(db_ro_delegations),
5835 };
5836
5837 static void print_dbstatistics(const char *db_name,
5838                                struct ctdb_db_statistics *s)
5839 {
5840         int i;
5841         const char *prefix = NULL;
5842         int preflen = 0;
5843
5844         printf("DB Statistics %s\n", db_name);
5845
5846         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5847                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5848                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5849                         if (! prefix ||
5850                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5851                                 prefix = db_stats_fields[i].name;
5852                                 printf(" %*.*s\n", preflen-1, preflen-1,
5853                                        db_stats_fields[i].name);
5854                         }
5855                 } else {
5856                         preflen = 0;
5857                 }
5858                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5859                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5860                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5861         }
5862
5863         printf(" hop_count_buckets:");
5864         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5865                 printf(" %d", s->hop_count_bucket[i]);
5866         }
5867         printf("\n");
5868
5869         printf(" lock_buckets:");
5870         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5871                 printf(" %d", s->locks.buckets[i]);
5872         }
5873         printf("\n");
5874
5875         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5876                "locks_latency      MIN/AVG/MAX",
5877                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5878                s->locks.latency.max, s->locks.latency.num);
5879
5880         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5881                "vacuum_latency     MIN/AVG/MAX",
5882                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5883                s->vacuum.latency.max, s->vacuum.latency.num);
5884
5885         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5886         for (i=0; i<s->num_hot_keys; i++) {
5887                 int j;
5888                 printf("     Count:%d Key:", s->hot_keys[i].count);
5889                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5890                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5891                 }
5892                 printf("\n");
5893         }
5894 }
5895
5896 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5897                                 int argc, const char **argv)
5898 {
5899         uint32_t db_id;
5900         const char *db_name;
5901         struct ctdb_db_statistics *dbstats;
5902         int ret;
5903
5904         if (argc != 1) {
5905                 usage("dbstatistics");
5906         }
5907
5908         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5909                 return 1;
5910         }
5911
5912         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5913                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5914                                           &dbstats);
5915         if (ret != 0) {
5916                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5917                         db_name);
5918                 return ret;
5919         }
5920
5921         print_dbstatistics(db_name, dbstats);
5922         return 0;
5923 }
5924
5925 struct disable_takeover_runs_state {
5926         uint32_t *pnn_list;
5927         int node_count;
5928         bool *reply;
5929         int status;
5930         bool done;
5931 };
5932
5933 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5934                                          void *private_data)
5935 {
5936         struct disable_takeover_runs_state *state =
5937                 (struct disable_takeover_runs_state *)private_data;
5938         int ret, i;
5939
5940         if (data.dsize != sizeof(int)) {
5941                 /* Ignore packet */
5942                 return;
5943         }
5944
5945         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5946         ret = *(int *)data.dptr;
5947         if (ret < 0) {
5948                 state->status = ret;
5949                 state->done = true;
5950                 return;
5951         }
5952         for (i=0; i<state->node_count; i++) {
5953                 if (state->pnn_list[i] == ret) {
5954                         state->reply[i] = true;
5955                         break;
5956                 }
5957         }
5958
5959         state->done = true;
5960         for (i=0; i<state->node_count; i++) {
5961                 if (! state->reply[i]) {
5962                         state->done = false;
5963                         break;
5964                 }
5965         }
5966 }
5967
5968 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5969                                  struct ctdb_context *ctdb, uint32_t timeout,
5970                                  uint32_t *pnn_list, int count)
5971 {
5972         struct ctdb_disable_message disable = { 0 };
5973         struct disable_takeover_runs_state state;
5974         int ret, i;
5975
5976         disable.pnn = ctdb->pnn;
5977         disable.srvid = next_srvid(ctdb);
5978         disable.timeout = timeout;
5979
5980         state.pnn_list = pnn_list;
5981         state.node_count = count;
5982         state.done = false;
5983         state.status = 0;
5984         state.reply = talloc_zero_array(mem_ctx, bool, count);
5985         if (state.reply == NULL) {
5986                 return ENOMEM;
5987         }
5988
5989         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5990                                               disable.srvid,
5991                                               disable_takeover_run_handler,
5992                                               &state);
5993         if (ret != 0) {
5994                 return ret;
5995         }
5996
5997         for (i=0; i<count; i++) {
5998                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5999                                                          ctdb->client,
6000                                                          pnn_list[i],
6001                                                          &disable);
6002                 if (ret != 0) {
6003                         goto fail;
6004                 }
6005         }
6006
6007         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
6008         if (ret == ETIME) {
6009                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
6010         } else {
6011                 ret = (state.status >= 0 ? 0 : 1);
6012         }
6013
6014 fail:
6015         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
6016                                            disable.srvid, &state);
6017         return ret;
6018 }
6019
6020 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6021                              int argc, const char **argv)
6022 {
6023         const char *nodestring = NULL;
6024         struct ctdb_node_map *nodemap, *nodemap2;
6025         struct ctdb_req_control request;
6026         uint32_t *pnn_list, *pnn_list2;
6027         int ret, count, count2;
6028
6029         if (argc > 1) {
6030                 usage("reloadips");
6031         }
6032
6033         if (argc == 1) {
6034                 nodestring = argv[0];
6035         }
6036
6037         nodemap = get_nodemap(ctdb, false);
6038         if (nodemap == NULL) {
6039                 return 1;
6040         }
6041
6042         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6043                 return 1;
6044         }
6045
6046         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6047                                         mem_ctx, &pnn_list);
6048         if (count <= 0) {
6049                 fprintf(stderr, "Memory allocation error\n");
6050                 return 1;
6051         }
6052
6053         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6054                                       mem_ctx, &pnn_list2);
6055         if (count2 <= 0) {
6056                 fprintf(stderr, "Memory allocation error\n");
6057                 return 1;
6058         }
6059
6060         /* Disable takeover runs on all connected nodes.  A reply
6061          * indicating success is needed from each node so all nodes
6062          * will need to be active.
6063          *
6064          * A check could be added to not allow reloading of IPs when
6065          * there are disconnected nodes.  However, this should
6066          * probably be left up to the administrator.
6067          */
6068         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6069                                     pnn_list, count);
6070         if (ret != 0) {
6071                 fprintf(stderr, "Failed to disable takeover runs\n");
6072                 return ret;
6073         }
6074
6075         /* Now tell all the desired nodes to reload their public IPs.
6076          * Keep trying this until it succeeds.  This assumes all
6077          * failures are transient, which might not be true...
6078          */
6079         ctdb_req_control_reload_public_ips(&request);
6080         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6081                                         pnn_list2, count2, TIMEOUT(),
6082                                         &request, NULL, NULL);
6083         if (ret != 0) {
6084                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6085         }
6086
6087         /* It isn't strictly necessary to wait until takeover runs are
6088          * re-enabled but doing so can't hurt.
6089          */
6090         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6091         if (ret != 0) {
6092                 fprintf(stderr, "Failed to enable takeover runs\n");
6093                 return ret;
6094         }
6095
6096         return ipreallocate(mem_ctx, ctdb);
6097 }
6098
6099 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6100                            int argc, const char **argv)
6101 {
6102         ctdb_sock_addr addr;
6103         char *iface;
6104
6105         if (argc != 1) {
6106                 usage("ipiface");
6107         }
6108
6109         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6110                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6111                 return 1;
6112         }
6113
6114         iface = ctdb_sys_find_ifname(&addr);
6115         if (iface == NULL) {
6116                 fprintf(stderr, "Failed to find interface for IP %s\n",
6117                         argv[0]);
6118                 return 1;
6119         }
6120         free(iface);
6121
6122         return 0;
6123 }
6124
6125
6126 static const struct ctdb_cmd {
6127         const char *name;
6128         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6129         bool without_daemon; /* can be run without daemon running ? */
6130         bool remote; /* can be run on remote nodes */
6131         const char *msg;
6132         const char *args;
6133 } ctdb_commands[] = {
6134         { "version", control_version, true, false,
6135                 "show version of ctdb", NULL },
6136         { "status", control_status, false, true,
6137                 "show node status", NULL },
6138         { "uptime", control_uptime, false, true,
6139                 "show node uptime", NULL },
6140         { "ping", control_ping, false, true,
6141                 "ping all nodes", NULL },
6142         { "runstate", control_runstate, false, true,
6143                 "get/check runstate of a node",
6144                 "[setup|first_recovery|startup|running]" },
6145         { "getvar", control_getvar, false, true,
6146                 "get a tunable variable", "<name>" },
6147         { "setvar", control_setvar, false, true,
6148                 "set a tunable variable", "<name> <value>" },
6149         { "listvars", control_listvars, false, true,
6150                 "list tunable variables", NULL },
6151         { "statistics", control_statistics, false, true,
6152                 "show ctdb statistics", NULL },
6153         { "statisticsreset", control_statistics_reset, false, true,
6154                 "reset ctdb statistics", NULL },
6155         { "stats", control_stats, false, true,
6156                 "show rolling statistics", "[count]" },
6157         { "ip", control_ip, false, true,
6158                 "show public ips", "[all]" },
6159         { "ipinfo", control_ipinfo, false, true,
6160                 "show public ip details", "<ip>" },
6161         { "ifaces", control_ifaces, false, true,
6162                 "show interfaces", NULL },
6163         { "setifacelink", control_setifacelink, false, true,
6164                 "set interface link status", "<iface> up|down" },
6165         { "process-exists", control_process_exists, false, true,
6166                 "check if a process exists on a node",  "<pid>" },
6167         { "getdbmap", control_getdbmap, false, true,
6168                 "show attached databases", NULL },
6169         { "getdbstatus", control_getdbstatus, false, true,
6170                 "show database status", "<dbname|dbid>" },
6171         { "catdb", control_catdb, false, false,
6172                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6173         { "cattdb", control_cattdb, false, false,
6174                 "dump local ctdb database", "<dbname|dbid>" },
6175         { "getmonmode", control_getmonmode, false, true,
6176                 "show monitoring mode", NULL },
6177         { "getcapabilities", control_getcapabilities, false, true,
6178                 "show node capabilities", NULL },
6179         { "pnn", control_pnn, false, false,
6180                 "show the pnn of the currnet node", NULL },
6181         { "lvs", control_lvs, false, false,
6182                 "show lvs configuration", "master|list|status" },
6183         { "disablemonitor", control_disable_monitor, false, true,
6184                 "disable monitoring", NULL },
6185         { "enablemonitor", control_enable_monitor, false, true,
6186                 "enable monitoring", NULL },
6187         { "setdebug", control_setdebug, false, true,
6188                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6189         { "getdebug", control_getdebug, false, true,
6190                 "get debug level", NULL },
6191         { "attach", control_attach, false, false,
6192                 "attach a database", "<dbname> [persistent]" },
6193         { "detach", control_detach, false, false,
6194                 "detach database(s)", "<dbname|dbid> ..." },
6195         { "dumpmemory", control_dumpmemory, false, true,
6196                 "dump ctdbd memory map", NULL },
6197         { "rddumpmemory", control_rddumpmemory, false, true,
6198                 "dump recoverd memory map", NULL },
6199         { "getpid", control_getpid, false, true,
6200                 "get ctdbd process ID", NULL },
6201         { "disable", control_disable, false, true,
6202                 "disable a node", NULL },
6203         { "enable", control_enable, false, true,
6204                 "enable a node", NULL },
6205         { "stop", control_stop, false, true,
6206                 "stop a node", NULL },
6207         { "continue", control_continue, false, true,
6208                 "continue a stopped node", NULL },
6209         { "ban", control_ban, false, true,
6210                 "ban a node", "<bantime>"},
6211         { "unban", control_unban, false, true,
6212                 "unban a node", NULL },
6213         { "shutdown", control_shutdown, false, true,
6214                 "shutdown ctdb daemon", NULL },
6215         { "recover", control_recover, false, true,
6216                 "force recovery", NULL },
6217         { "sync", control_ipreallocate, false, true,
6218                 "run ip reallocation (deprecated)", NULL },
6219         { "ipreallocate", control_ipreallocate, false, true,
6220                 "run ip reallocation", NULL },
6221         { "isnotrecmaster", control_isnotrecmaster, false, false,
6222                 "check if local node is the recmaster", NULL },
6223         { "gratarp", control_gratarp, false, true,
6224                 "send a gratuitous arp", "<ip> <interface>" },
6225         { "tickle", control_tickle, true, false,
6226                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6227         { "gettickles", control_gettickles, false, true,
6228                 "get the list of tickles", "<ip> [<port>]" },
6229         { "addtickle", control_addtickle, false, true,
6230                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6231         { "deltickle", control_deltickle, false, true,
6232                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6233         { "check_srvids", control_check_srvids, false, true,
6234                 "check if srvid is registered", "<id> [<id> ...]" },
6235         { "listnodes", control_listnodes, true, true,
6236                 "list nodes in the cluster", NULL },
6237         { "reloadnodes", control_reloadnodes, false, false,
6238                 "reload the nodes file all nodes", NULL },
6239         { "moveip", control_moveip, false, false,
6240                 "move an ip address to another node", "<ip> <node>" },
6241         { "addip", control_addip, false, true,
6242                 "add an ip address to a node", "<ip/mask> <iface>" },
6243         { "delip", control_delip, false, true,
6244                 "delete an ip address from a node", "<ip>" },
6245         { "eventscript", control_eventscript, false, true,
6246                 "run an event", "monitor" },
6247         { "backupdb", control_backupdb, false, false,
6248                 "backup a database into a file", "<dbname|dbid> <file>" },
6249         { "restoredb", control_restoredb, false, false,
6250                 "restore a database from a file", "<file> [dbname]" },
6251         { "dumpdbbackup", control_dumpdbbackup, true, false,
6252                 "dump database from a backup file", "<file>" },
6253         { "wipedb", control_wipedb, false, false,
6254                 "wipe the contents of a database.", "<dbname|dbid>"},
6255         { "recmaster", control_recmaster, false, true,
6256                 "show the pnn for the recovery master", NULL },
6257         { "scriptstatus", control_scriptstatus, false, true,
6258                 "show event script status",
6259                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6260         { "enablescript", control_enablescript, false, true,
6261                 "enable an eventscript", "<script>"},
6262         { "disablescript", control_disablescript, false, true,
6263                 "disable an eventscript", "<script>"},
6264         { "natgw", control_natgw, false, false,
6265                 "show natgw configuration", "master|list|status" },
6266         { "natgwlist", control_natgwlist, false, false,
6267                 "show the nodes belonging to this natgw configuration", NULL },
6268         { "getreclock", control_getreclock, false, true,
6269                 "get recovery lock file", NULL },
6270         { "setlmasterrole", control_setlmasterrole, false, true,
6271                 "set LMASTER role", "on|off" },
6272         { "setrecmasterrole", control_setrecmasterrole, false, true,
6273                 "set RECMASTER role", "on|off"},
6274         { "setdbreadonly", control_setdbreadonly, false, true,
6275                 "enable readonly records", "<dbname|dbid>" },
6276         { "setdbsticky", control_setdbsticky, false, true,
6277                 "enable sticky records", "<dbname|dbid>"},
6278         { "pfetch", control_pfetch, false, false,
6279                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6280         { "pstore", control_pstore, false, false,
6281                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6282         { "pdelete", control_pdelete, false, false,
6283                 "delete record from persistent database", "<dbname|dbid> <key>" },
6284         { "ptrans", control_ptrans, false, false,
6285                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6286         { "tfetch", control_tfetch, false, true,
6287                 "fetch a record", "<tdb-file> <key> [<file>]" },
6288         { "tstore", control_tstore, false, true,
6289                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6290         { "readkey", control_readkey, false, false,
6291                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6292         { "writekey", control_writekey, false, false,
6293                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6294         { "deletekey", control_deletekey, false, false,
6295                 "delete a database key", "<dbname|dbid> <key>" },
6296         { "checktcpport", control_checktcpport, true, false,
6297                 "check if a service is bound to a specific tcp port or not", "<port>" },
6298         { "getdbseqnum", control_getdbseqnum, false, false,
6299                 "get database sequence number", "<dbname|dbid>" },
6300         { "nodestatus", control_nodestatus, false, true,
6301                 "show and return node status", "[all|<pnn-list>]" },
6302         { "dbstatistics", control_dbstatistics, false, true,
6303                 "show database statistics", "<dbname|dbid>" },
6304         { "reloadips", control_reloadips, false, false,
6305                 "reload the public addresses file", "[all|<pnn-list>]" },
6306         { "ipiface", control_ipiface, true, false,
6307                 "Find the interface an ip address is hosted on", "<ip>" },
6308 };
6309
6310 static const struct ctdb_cmd *match_command(const char *command)
6311 {
6312         const struct ctdb_cmd *cmd;
6313         int i;
6314
6315         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6316                 cmd = &ctdb_commands[i];
6317                 if (strlen(command) == strlen(cmd->name) &&
6318                     strncmp(command, cmd->name, strlen(command)) == 0) {
6319                         return cmd;
6320                 }
6321         }
6322
6323         return NULL;
6324 }
6325
6326
6327 /**
6328  * Show usage message
6329  */
6330 static void usage_full(void)
6331 {
6332         int i;
6333
6334         poptPrintHelp(pc, stdout, 0);
6335         printf("\nCommands:\n");
6336         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6337                 printf("  %-15s %-27s  %s\n",
6338                        ctdb_commands[i].name,
6339                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6340                        ctdb_commands[i].msg);
6341         }
6342 }
6343
6344 static void usage(const char *command)
6345 {
6346         const struct ctdb_cmd *cmd;
6347
6348         if (command == NULL) {
6349                 usage_full();
6350                 exit(1);
6351         }
6352
6353         cmd = match_command(command);
6354         if (cmd == NULL) {
6355                 usage_full();
6356         } else {
6357                 poptPrintUsage(pc, stdout, 0);
6358                 printf("\nCommands:\n");
6359                 printf("  %-15s %-27s  %s\n",
6360                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6361         }
6362
6363         exit(1);
6364 }
6365
6366 struct poptOption cmdline_options[] = {
6367         POPT_AUTOHELP
6368         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6369                 "CTDB socket path", "filename" },
6370         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6371                 "debug level"},
6372         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6373                 "timelimit (in seconds)" },
6374         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6375                 "node specification - integer" },
6376         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6377                 "enable machine readable output", NULL },
6378         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6379                 "specify separator for machine readable output", "CHAR" },
6380         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6381                 "enable machine parsable output with separator |", NULL },
6382         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6383                 "enable verbose output", NULL },
6384         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6385                 "die if runtime exceeds this limit (in seconds)" },
6386         POPT_TABLEEND
6387 };
6388
6389 static int process_command(const struct ctdb_cmd *cmd, int argc,
6390                            const char **argv)
6391 {
6392         TALLOC_CTX *tmp_ctx;
6393         struct ctdb_context *ctdb;
6394         int ret;
6395         bool status;
6396         uint64_t srvid_offset;
6397
6398         tmp_ctx = talloc_new(NULL);
6399         if (tmp_ctx == NULL) {
6400                 fprintf(stderr, "Memory allocation error\n");
6401                 goto fail;
6402         }
6403
6404         if (cmd->without_daemon) {
6405                 if (options.pnn != -1) {
6406                         fprintf(stderr,
6407                                 "Cannot specify node for command %s\n",
6408                                 cmd->name);
6409                         goto fail;
6410                 }
6411
6412                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6413                 talloc_free(tmp_ctx);
6414                 return ret;
6415         }
6416
6417         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6418         if (ctdb == NULL) {
6419                 fprintf(stderr, "Memory allocation error\n");
6420                 goto fail;
6421         }
6422
6423         ctdb->ev = tevent_context_init(ctdb);
6424         if (ctdb->ev == NULL) {
6425                 fprintf(stderr, "Failed to initialize tevent\n");
6426                 goto fail;
6427         }
6428
6429         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6430         if (ret != 0) {
6431                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6432                         options.socket);
6433
6434                 if (!find_node_xpnn(ctdb, NULL)) {
6435                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6436                 }
6437                 goto fail;
6438         }
6439
6440         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6441         srvid_offset = getpid() & 0xFFFF;
6442         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6443
6444         if (options.pnn != -1) {
6445                 status = verify_pnn(ctdb, options.pnn);
6446                 if (! status) {
6447                         goto fail;
6448                 }
6449
6450                 ctdb->cmd_pnn = options.pnn;
6451         } else {
6452                 ctdb->cmd_pnn = ctdb->pnn;
6453         }
6454
6455         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6456                 fprintf(stderr, "Node cannot be specified for command %s\n",
6457                         cmd->name);
6458                 goto fail;
6459         }
6460
6461         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6462         talloc_free(tmp_ctx);
6463         return ret;
6464
6465 fail:
6466         talloc_free(tmp_ctx);
6467         return 1;
6468 }
6469
6470 static void signal_handler(int sig)
6471 {
6472         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6473 }
6474
6475 static void alarm_handler(int sig)
6476 {
6477         /* Kill any child processes */
6478         signal(SIGTERM, signal_handler);
6479         kill(0, SIGTERM);
6480
6481         _exit(1);
6482 }
6483
6484 int main(int argc, const char *argv[])
6485 {
6486         int opt;
6487         const char **extra_argv;
6488         int extra_argc;
6489         const struct ctdb_cmd *cmd;
6490         const char *ctdb_socket;
6491         int loglevel;
6492         int ret;
6493
6494         setlinebuf(stdout);
6495
6496         /* Set default options */
6497         options.socket = CTDB_SOCKET;
6498         options.debuglevelstr = NULL;
6499         options.timelimit = 10;
6500         options.sep = "|";
6501         options.maxruntime = 0;
6502         options.pnn = -1;
6503
6504         ctdb_socket = getenv("CTDB_SOCKET");
6505         if (ctdb_socket != NULL) {
6506                 options.socket = ctdb_socket;
6507         }
6508
6509         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6510                             POPT_CONTEXT_KEEP_FIRST);
6511         while ((opt = poptGetNextOpt(pc)) != -1) {
6512                 fprintf(stderr, "Invalid option %s: %s\n",
6513                         poptBadOption(pc, 0), poptStrerror(opt));
6514                 exit(1);
6515         }
6516
6517         if (options.maxruntime == 0) {
6518                 const char *ctdb_timeout;
6519
6520                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6521                 if (ctdb_timeout != NULL) {
6522                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6523                 } else {
6524                         options.maxruntime = 120;
6525                 }
6526         }
6527         if (options.maxruntime <= 120) {
6528                 /* default timeout is 120 seconds */
6529                 options.maxruntime = 120;
6530         }
6531
6532         if (options.machineparsable) {
6533                 options.machinereadable = 1;
6534         }
6535
6536         /* setup the remaining options for the commands */
6537         extra_argc = 0;
6538         extra_argv = poptGetArgs(pc);
6539         if (extra_argv) {
6540                 extra_argv++;
6541                 while (extra_argv[extra_argc]) extra_argc++;
6542         }
6543
6544         if (extra_argc < 1) {
6545                 usage(NULL);
6546         }
6547
6548         cmd = match_command(extra_argv[0]);
6549         if (cmd == NULL) {
6550                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6551                 exit(1);
6552         }
6553
6554         /* Enable logging */
6555         setup_logging("ctdb", DEBUG_STDERR);
6556         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6557                 DEBUGLEVEL = loglevel;
6558         } else {
6559                 DEBUGLEVEL = DEBUG_ERR;
6560         }
6561
6562         signal(SIGALRM, alarm_handler);
6563         alarm(options.maxruntime);
6564
6565         ret = process_command(cmd, extra_argc, extra_argv);
6566         if (ret == -1) {
6567                 ret = 1;
6568         }
6569
6570         (void)poptFreeContext(pc);
6571
6572         return ret;
6573 }