ctdb-logging: Drop enum debug_level
[samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/sys_rw.h"
36
37 #include "common/db_hash.h"
38 #include "common/logging.h"
39 #include "protocol/protocol.h"
40 #include "protocol/protocol_api.h"
41 #include "common/system.h"
42 #include "client/client.h"
43
44 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
45
46 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
47 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
48
49 static struct {
50         const char *socket;
51         const char *debuglevelstr;
52         int timelimit;
53         int pnn;
54         int machinereadable;
55         const char *sep;
56         int machineparsable;
57         int verbose;
58         int maxruntime;
59         int printemptyrecords;
60         int printdatasize;
61         int printlmaster;
62         int printhash;
63         int printrecordflags;
64 } options;
65
66 static poptContext pc;
67
68 struct ctdb_context {
69         struct tevent_context *ev;
70         struct ctdb_client_context *client;
71         struct ctdb_node_map *nodemap;
72         uint32_t pnn, cmd_pnn;
73         uint64_t srvid;
74 };
75
76 static void usage(const char *command);
77
78 /*
79  * Utility Functions
80  */
81
82 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
83 {
84         return (tv2->tv_sec - tv->tv_sec) +
85                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
86 }
87
88 static struct ctdb_node_and_flags *get_node_by_pnn(
89                                         struct ctdb_node_map *nodemap,
90                                         uint32_t pnn)
91 {
92         int i;
93
94         for (i=0; i<nodemap->num; i++) {
95                 if (nodemap->node[i].pnn == pnn) {
96                         return &nodemap->node[i];
97                 }
98         }
99         return NULL;
100 }
101
102 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
103 {
104         static const struct {
105                 uint32_t flag;
106                 const char *name;
107         } flag_names[] = {
108                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
109                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
110                 { NODE_FLAGS_BANNED,                "BANNED" },
111                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
112                 { NODE_FLAGS_DELETED,               "DELETED" },
113                 { NODE_FLAGS_STOPPED,               "STOPPED" },
114                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
115         };
116         char *flags_str = NULL;
117         int i;
118
119         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
120                 if (flags & flag_names[i].flag) {
121                         if (flags_str == NULL) {
122                                 flags_str = talloc_asprintf(mem_ctx,
123                                                 "%s", flag_names[i].name);
124                         } else {
125                                 flags_str = talloc_asprintf_append(flags_str,
126                                                 "|%s", flag_names[i].name);
127                         }
128                         if (flags_str == NULL) {
129                                 return "OUT-OF-MEMORY";
130                         }
131                 }
132         }
133         if (flags_str == NULL) {
134                 return "OK";
135         }
136
137         return flags_str;
138 }
139
140 static uint64_t next_srvid(struct ctdb_context *ctdb)
141 {
142         ctdb->srvid += 1;
143         return ctdb->srvid;
144 }
145
146 /*
147  * Get consistent nodemap information.
148  *
149  * If nodemap is already cached, use that. If not get it.
150  * If the current node is BANNED, then get nodemap from "better" node.
151  */
152 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
153 {
154         TALLOC_CTX *tmp_ctx;
155         struct ctdb_node_map *nodemap;
156         struct ctdb_node_and_flags *node;
157         uint32_t current_node;
158         int ret;
159
160         if (force) {
161                 TALLOC_FREE(ctdb->nodemap);
162         }
163
164         if (ctdb->nodemap != NULL) {
165                 return ctdb->nodemap;
166         }
167
168         tmp_ctx = talloc_new(ctdb);
169         if (tmp_ctx == NULL) {
170                 return false;
171         }
172
173         current_node = ctdb->pnn;
174 again:
175         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
176                                     current_node, TIMEOUT(), &nodemap);
177         if (ret != 0) {
178                 fprintf(stderr, "Failed to get nodemap from node %u\n",
179                         current_node);
180                 goto failed;
181         }
182
183         node = get_node_by_pnn(nodemap, current_node);
184         if (node->flags & NODE_FLAGS_BANNED) {
185                 /* Pick next node */
186                 do {
187                         current_node = (current_node + 1) % nodemap->num;
188                         node = get_node_by_pnn(nodemap, current_node);
189                         if (! (node->flags &
190                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
191                                 break;
192                         }
193                 } while (current_node != ctdb->pnn);
194
195                 if (current_node == ctdb->pnn) {
196                         /* Tried all nodes in the cluster */
197                         fprintf(stderr, "Warning: All nodes are banned.\n");
198                         goto failed;
199                 }
200
201                 goto again;
202         }
203
204         ctdb->nodemap = talloc_steal(ctdb, nodemap);
205         return nodemap;
206
207 failed:
208         talloc_free(tmp_ctx);
209         return NULL;
210 }
211
212 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
213 {
214         struct ctdb_node_map *nodemap;
215         bool found;
216         int i;
217
218         if (pnn == -1) {
219                 return false;
220         }
221
222         nodemap = get_nodemap(ctdb, false);
223         if (nodemap == NULL) {
224                 return false;
225         }
226
227         found = false;
228         for (i=0; i<nodemap->num; i++) {
229                 if (nodemap->node[i].pnn == pnn) {
230                         found = true;
231                         break;
232                 }
233         }
234         if (! found) {
235                 fprintf(stderr, "Node %u does not exist\n", pnn);
236                 return false;
237         }
238
239         if (nodemap->node[i].flags &
240             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
241                 fprintf(stderr, "Node %u has status %s\n", pnn,
242                         pretty_print_flags(ctdb, nodemap->node[i].flags));
243                 return false;
244         }
245
246         return true;
247 }
248
249 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
250                                             struct ctdb_node_map *nodemap)
251 {
252         struct ctdb_node_map *nodemap2;
253
254         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
255         if (nodemap2 == NULL) {
256                 return NULL;
257         }
258
259         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
260                                       nodemap->num);
261         if (nodemap2->node == NULL) {
262                 talloc_free(nodemap2);
263                 return NULL;
264         }
265
266         return nodemap2;
267 }
268
269 /*
270  * Get the number and the list of matching nodes
271  *
272  *   nodestring :=  NULL | all | pnn,[pnn,...]
273  *
274  * If nodestring is NULL, use the current node.
275  */
276 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
277                              const char *nodestring,
278                              struct ctdb_node_map **out)
279 {
280         struct ctdb_node_map *nodemap, *nodemap2;
281         struct ctdb_node_and_flags *node;
282         int i;
283
284         nodemap = get_nodemap(ctdb, false);
285         if (nodemap == NULL) {
286                 return false;
287         }
288
289         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
290         if (nodemap2 == NULL) {
291                 return false;
292         }
293
294         if (nodestring == NULL) {
295                 for (i=0; i<nodemap->num; i++) {
296                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
297                                 nodemap2->node[0] = nodemap->node[i];
298                                 break;
299                         }
300                 }
301                 nodemap2->num = 1;
302
303                 goto done;
304         }
305
306         if (strcmp(nodestring, "all") == 0) {
307                 for (i=0; i<nodemap->num; i++) {
308                         nodemap2->node[i] = nodemap->node[i];
309                 }
310                 nodemap2->num = nodemap->num;
311
312                 goto done;
313         } else {
314                 char *ns, *tok;
315
316                 ns = talloc_strdup(mem_ctx, nodestring);
317                 if (ns == NULL) {
318                         return false;
319                 }
320
321                 tok = strtok(ns, ",");
322                 while (tok != NULL) {
323                         uint32_t pnn;
324                         char *endptr;
325
326                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
327                         if (pnn == 0 && tok == endptr) {
328                                 fprintf(stderr, "Invalid node %s\n", tok);
329                                         return false;
330                         }
331
332                         node = get_node_by_pnn(nodemap, pnn);
333                         if (node == NULL) {
334                                 fprintf(stderr, "Node %u does not exist\n",
335                                         pnn);
336                                 return false;
337                         }
338
339                         nodemap2->node[nodemap2->num] = *node;
340                         nodemap2->num += 1;
341
342                         tok = strtok(NULL, ",");
343                 }
344         }
345
346 done:
347         *out = nodemap2;
348         return true;
349 }
350
351 /* Compare IP address */
352 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
353 {
354         bool ret = false;
355
356         if (ip1->sa.sa_family != ip2->sa.sa_family) {
357                 return false;
358         }
359
360         switch (ip1->sa.sa_family) {
361         case AF_INET:
362                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
363                               sizeof(struct in_addr)) == 0);
364                 break;
365
366         case AF_INET6:
367                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
368                               sizeof(struct in6_addr)) == 0);
369                 break;
370         }
371
372         return ret;
373 }
374
375 /* Append a node to a node map with given address and flags */
376 static bool node_map_add(struct ctdb_node_map *nodemap,
377                          const char *nstr, uint32_t flags)
378 {
379         ctdb_sock_addr addr;
380         uint32_t num;
381         struct ctdb_node_and_flags *n;
382
383         if (! parse_ip(nstr, NULL, 0, &addr)) {
384                 fprintf(stderr, "Invalid IP address %s\n", nstr);
385                 return false;
386         }
387
388         num = nodemap->num;
389         nodemap->node = talloc_realloc(nodemap, nodemap->node,
390                                        struct ctdb_node_and_flags, num+1);
391         if (nodemap->node == NULL) {
392                 return false;
393         }
394
395         n = &nodemap->node[num];
396         n->addr = addr;
397         n->pnn = num;
398         n->flags = flags;
399
400         nodemap->num = num+1;
401         return true;
402 }
403
404 /* Read a nodes file into a node map */
405 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
406                                                   const char *nlist)
407 {
408         char **lines;
409         int nlines;
410         int i;
411         struct ctdb_node_map *nodemap;
412
413         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
414         if (nodemap == NULL) {
415                 return NULL;
416         }
417
418         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
419         if (lines == NULL) {
420                 return NULL;
421         }
422
423         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
424                 nlines--;
425         }
426
427         for (i=0; i<nlines; i++) {
428                 char *node;
429                 uint32_t flags;
430                 size_t len;
431
432                 node = lines[i];
433                 /* strip leading spaces */
434                 while((*node == ' ') || (*node == '\t')) {
435                         node++;
436                 }
437
438                 len = strlen(node);
439
440                 /* strip trailing spaces */
441                 while ((len > 1) &&
442                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
443                 {
444                         node[len-1] = '\0';
445                         len--;
446                 }
447
448                 if (len == 0) {
449                         continue;
450                 }
451                 if (*node == '#') {
452                         /* A "deleted" node is a node that is
453                            commented out in the nodes file.  This is
454                            used instead of removing a line, which
455                            would cause subsequent nodes to change
456                            their PNN. */
457                         flags = NODE_FLAGS_DELETED;
458                         node = discard_const("0.0.0.0");
459                 } else {
460                         flags = 0;
461                 }
462                 if (! node_map_add(nodemap, node, flags)) {
463                         talloc_free(lines);
464                         TALLOC_FREE(nodemap);
465                         return NULL;
466                 }
467         }
468
469         talloc_free(lines);
470         return nodemap;
471 }
472
473 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
474 {
475         struct ctdb_node_map *nodemap;
476         char *nodepath;
477         const char *nodes_list = NULL;
478
479         if (pnn != CTDB_UNKNOWN_PNN) {
480                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
481                 if (nodepath != NULL) {
482                         nodes_list = getenv(nodepath);
483                 }
484         }
485         if (nodes_list == NULL) {
486                 nodes_list = getenv("CTDB_NODES");
487         }
488         if (nodes_list == NULL) {
489                 const char *basedir = getenv("CTDB_BASE");
490                 if (basedir == NULL) {
491                         basedir = CTDB_ETCDIR;
492                 }
493                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
494                 if (nodes_list == NULL) {
495                         fprintf(stderr, "Memory allocation error\n");
496                         return NULL;
497                 }
498         }
499
500         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
501         if (nodemap == NULL) {
502                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
503                         nodes_list);
504                 return NULL;
505         }
506
507         return nodemap;
508 }
509
510 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
511                                  struct ctdb_context *ctdb,
512                                  struct ctdb_dbid_map *dbmap,
513                                  const char *db_name)
514 {
515         struct ctdb_dbid *db = NULL;
516         const char *name;
517         int ret, i;
518
519         for (i=0; i<dbmap->num; i++) {
520                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
521                                            ctdb->pnn, TIMEOUT(),
522                                            dbmap->dbs[i].db_id, &name);
523                 if (ret != 0) {
524                         return false;
525                 }
526
527                 if (strcmp(db_name, name) == 0) {
528                         talloc_free(discard_const(name));
529                         db = &dbmap->dbs[i];
530                         break;
531                 }
532         }
533
534         return db;
535 }
536
537 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
538                       const char *db_arg, uint32_t *db_id,
539                       const char **db_name, uint8_t *db_flags)
540 {
541         struct ctdb_dbid_map *dbmap;
542         struct ctdb_dbid *db = NULL;
543         uint32_t id = 0;
544         const char *name = NULL;
545         int ret, i;
546
547         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
548                                   ctdb->pnn, TIMEOUT(), &dbmap);
549         if (ret != 0) {
550                 return false;
551         }
552
553         if (strncmp(db_arg, "0x", 2) == 0) {
554                 id = strtoul(db_arg, NULL, 0);
555                 for (i=0; i<dbmap->num; i++) {
556                         if (id == dbmap->dbs[i].db_id) {
557                                 db = &dbmap->dbs[i];
558                                 break;
559                         }
560                 }
561         } else {
562                 name = db_arg;
563                 db = db_find(mem_ctx, ctdb, dbmap, name);
564         }
565
566         if (db == NULL) {
567                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
568                 return false;
569         }
570
571         if (name == NULL) {
572                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
573                                            ctdb->pnn, TIMEOUT(), id, &name);
574                 if (ret != 0) {
575                         return false;
576                 }
577         }
578
579         if (db_id != NULL) {
580                 *db_id = db->db_id;
581         }
582         if (db_name != NULL) {
583                 *db_name = talloc_strdup(mem_ctx, name);
584         }
585         if (db_flags != NULL) {
586                 *db_flags = db->flags;
587         }
588         return true;
589 }
590
591 static int h2i(char h)
592 {
593         if (h >= 'a' && h <= 'f') {
594                 return h - 'a' + 10;
595         }
596         if (h >= 'A' && h <= 'F') {
597                 return h - 'f' + 10;
598         }
599         return h - '0';
600 }
601
602 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
603                        TDB_DATA *out)
604 {
605         int i;
606         TDB_DATA data;
607
608         if (len & 0x01) {
609                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
610                         str);
611                 return EINVAL;
612         }
613
614         data.dsize = len / 2;
615         data.dptr = talloc_size(mem_ctx, data.dsize);
616         if (data.dptr == NULL) {
617                 return ENOMEM;
618         }
619
620         for (i=0; i<data.dsize; i++) {
621                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
622         }
623
624         *out = data;
625         return 0;
626 }
627
628 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
629                        TDB_DATA *out)
630 {
631         TDB_DATA data;
632         int ret = 0;
633
634         if (strncmp(str, "0x", 2) == 0) {
635                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
636         } else {
637                 data.dptr = talloc_memdup(mem_ctx, str, len);
638                 if (data.dptr == NULL) {
639                         return ENOMEM;
640                 }
641                 data.dsize = len;
642         }
643
644         *out = data;
645         return ret;
646 }
647
648 static int run_helper(const char *command, const char *path, const char *arg1)
649 {
650         pid_t pid;
651         int save_errno, status, ret;
652
653         pid = fork();
654         if (pid < 0) {
655                 save_errno = errno;
656                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
657                         command, path, strerror(save_errno));
658                 return save_errno;
659         }
660
661         if (pid == 0) {
662                 ret = execl(path, path, arg1, NULL);
663                 if (ret == -1) {
664                         _exit(errno);
665                 }
666                 /* Should not happen */
667                 _exit(ENOEXEC);
668         }
669
670         ret = waitpid(pid, &status, 0);
671         if (ret == -1) {
672                 save_errno = errno;
673                 fprintf(stderr, "waitpid() failed for %s - %s\n",
674                         command, strerror(save_errno));
675                 return save_errno;
676         }
677
678         if (WIFEXITED(status)) {
679                 ret = WEXITSTATUS(status);
680                 return ret;
681         }
682
683         if (WIFSIGNALED(status)) {
684                 fprintf(stderr, "%s terminated with signal %d\n",
685                         command, WTERMSIG(status));
686                 return EINTR;
687         }
688
689         return 0;
690 }
691
692 /*
693  * Command Functions
694  */
695
696 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
697                            int argc, const char **argv)
698 {
699         printf("%s\n", CTDB_VERSION_STRING);
700         return 0;
701 }
702
703 static bool partially_online(TALLOC_CTX *mem_ctx,
704                              struct ctdb_context *ctdb,
705                              struct ctdb_node_and_flags *node)
706 {
707         struct ctdb_iface_list *iface_list;
708         int ret, i;
709         bool status = false;
710
711         if (node->flags != 0) {
712                 return false;
713         }
714
715         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
716                                    node->pnn, TIMEOUT(), &iface_list);
717         if (ret != 0) {
718                 return false;
719         }
720
721         status = false;
722         for (i=0; i < iface_list->num; i++) {
723                 if (iface_list->iface[i].link_state == 0) {
724                         status = true;
725                         break;
726                 }
727         }
728
729         return status;
730 }
731
732 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
733                                   struct ctdb_context *ctdb,
734                                   struct ctdb_node_map *nodemap,
735                                   uint32_t mypnn)
736 {
737         struct ctdb_node_and_flags *node;
738         int i;
739
740         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
741                options.sep,
742                "Node", options.sep,
743                "IP", options.sep,
744                "Disconnected", options.sep,
745                "Banned", options.sep,
746                "Disabled", options.sep,
747                "Unhealthy", options.sep,
748                "Stopped", options.sep,
749                "Inactive", options.sep,
750                "PartiallyOnline", options.sep,
751                "ThisNode", options.sep);
752
753         for (i=0; i<nodemap->num; i++) {
754                 node = &nodemap->node[i];
755                 if (node->flags & NODE_FLAGS_DELETED) {
756                         continue;
757                 }
758
759                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
760                        options.sep,
761                        node->pnn, options.sep,
762                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
763                        options.sep,
764                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
765                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
766                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
767                        options.sep,
768                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
769                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
770                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
771                        partially_online(mem_ctx, ctdb, node), options.sep,
772                        (node->pnn == mypnn)?'Y':'N', options.sep);
773         }
774
775 }
776
777 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
778                           struct ctdb_node_map *nodemap, uint32_t mypnn)
779 {
780         struct ctdb_node_and_flags *node;
781         int num_deleted_nodes = 0;
782         int i;
783
784         for (i=0; i<nodemap->num; i++) {
785                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
786                         num_deleted_nodes++;
787                 }
788         }
789
790         if (num_deleted_nodes == 0) {
791                 printf("Number of nodes:%d\n", nodemap->num);
792         } else {
793                 printf("Number of nodes:%d (including %d deleted nodes)\n",
794                        nodemap->num, num_deleted_nodes);
795         }
796
797         for (i=0; i<nodemap->num; i++) {
798                 node = &nodemap->node[i];
799                 if (node->flags & NODE_FLAGS_DELETED) {
800                         continue;
801                 }
802
803                 printf("pnn:%u %-16s %s%s\n",
804                        node->pnn,
805                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
806                        partially_online(mem_ctx, ctdb, node) ?
807                                 "PARTIALLYONLINE" :
808                                 pretty_print_flags(mem_ctx, node->flags),
809                        node->pnn == mypnn ? " (THIS NODE)" : "");
810         }
811 }
812
813 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
814                          struct ctdb_node_map *nodemap, uint32_t mypnn,
815                          struct ctdb_vnn_map *vnnmap, int recmode,
816                          uint32_t recmaster)
817 {
818         int i;
819
820         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
821
822         if (vnnmap->generation == INVALID_GENERATION) {
823                 printf("Generation:INVALID\n");
824         } else {
825                 printf("Generation:%u\n", vnnmap->generation);
826         }
827         printf("Size:%d\n", vnnmap->size);
828         for (i=0; i<vnnmap->size; i++) {
829                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
830         }
831
832         printf("Recovery mode:%s (%d)\n",
833                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
834                recmode);
835         printf("Recovery master:%d\n", recmaster);
836 }
837
838 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
839                           int argc, const char **argv)
840 {
841         struct ctdb_node_map *nodemap;
842         struct ctdb_vnn_map *vnnmap;
843         int recmode;
844         uint32_t recmaster;
845         int ret;
846
847         if (argc != 0) {
848                 usage("status");
849         }
850
851         nodemap = get_nodemap(ctdb, false);
852         if (nodemap == NULL) {
853                 return 1;
854         }
855
856         if (options.machinereadable == 1) {
857                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
858                 return 0;
859         }
860
861         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
862                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
863         if (ret != 0) {
864                 return ret;
865         }
866
867         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
868                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
869         if (ret != 0) {
870                 return ret;
871         }
872
873         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
874                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
875         if (ret != 0) {
876                 return ret;
877         }
878
879         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
880                      recmode, recmaster);
881         return 0;
882 }
883
884 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
885                           int argc, const char **argv)
886 {
887         struct ctdb_uptime *uptime;
888         int ret, tmp, days, hours, minutes, seconds;
889
890         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
891                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
892         if (ret != 0) {
893                 return ret;
894         }
895
896         printf("Current time of node %-4u     :                %s",
897                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
898
899         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
900         seconds = tmp % 60; tmp /= 60;
901         minutes = tmp % 60; tmp /= 60;
902         hours = tmp % 24; tmp /= 24;
903         days = tmp;
904
905         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
906                days, hours, minutes, seconds,
907                ctime(&uptime->ctdbd_start_time.tv_sec));
908
909         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
910         seconds = tmp % 60; tmp /= 60;
911         minutes = tmp % 60; tmp /= 60;
912         hours = tmp % 24; tmp /= 24;
913         days = tmp;
914
915         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
916                days, hours, minutes, seconds,
917                ctime(&uptime->last_recovery_finished.tv_sec));
918
919         printf("Duration of last recovery/failover: %lf seconds\n",
920                timeval_delta(&uptime->last_recovery_finished,
921                              &uptime->last_recovery_started));
922
923         return 0;
924 }
925
926 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
927                         int argc, const char **argv)
928 {
929         struct timeval tv;
930         int ret, num_clients;
931
932         tv = timeval_current();
933         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
934                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
935         if (ret != 0) {
936                 return ret;
937         }
938
939         printf("response from %u time=%.6f sec  (%d clients)\n",
940                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
941         return 0;
942 }
943
944 const char *runstate_to_string(enum ctdb_runstate runstate);
945 enum ctdb_runstate runstate_from_string(const char *runstate_str);
946
947 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
948                             int argc, const char **argv)
949 {
950         enum ctdb_runstate runstate;
951         bool found;
952         int ret, i;
953
954         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
955                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
956         if (ret != 0) {
957                 return ret;
958         }
959
960         found = true;
961         for (i=0; i<argc; i++) {
962                 enum ctdb_runstate t;
963
964                 found = false;
965                 t = ctdb_runstate_from_string(argv[i]);
966                 if (t == CTDB_RUNSTATE_UNKNOWN) {
967                         printf("Invalid run state (%s)\n", argv[i]);
968                         return 1;
969                 }
970
971                 if (t == runstate) {
972                         found = true;
973                         break;
974                 }
975         }
976
977         if (! found) {
978                 printf("CTDB not in required run state (got %s)\n",
979                        ctdb_runstate_to_string(runstate));
980                 return 1;
981         }
982
983         printf("%s\n", ctdb_runstate_to_string(runstate));
984         return 0;
985 }
986
987 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
988                           int argc, const char **argv)
989 {
990         struct ctdb_var_list *tun_var_list;
991         uint32_t value;
992         int ret, i;
993         bool found;
994
995         if (argc != 1) {
996                 usage("getvar");
997         }
998
999         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1000                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1001         if (ret != 0) {
1002                 fprintf(stderr,
1003                         "Failed to get list of variables from node %u\n",
1004                         ctdb->cmd_pnn);
1005                 return ret;
1006         }
1007
1008         found = false;
1009         for (i=0; i<tun_var_list->count; i++) {
1010                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1011                         found = true;
1012                         break;
1013                 }
1014         }
1015
1016         if (! found) {
1017                 printf("No such tunable %s\n", argv[0]);
1018                 return 1;
1019         }
1020
1021         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1022                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1023         if (ret != 0) {
1024                 return ret;
1025         }
1026
1027         printf("%-26s = %u\n", argv[0], value);
1028         return 0;
1029 }
1030
1031 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1032                           int argc, const char **argv)
1033 {
1034         struct ctdb_var_list *tun_var_list;
1035         struct ctdb_tunable tunable;
1036         int ret, i;
1037         bool found;
1038
1039         if (argc != 2) {
1040                 usage("setvar");
1041         }
1042
1043         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1044                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1045         if (ret != 0) {
1046                 fprintf(stderr,
1047                         "Failed to get list of variables from node %u\n",
1048                         ctdb->cmd_pnn);
1049                 return ret;
1050         }
1051
1052         found = false;
1053         for (i=0; i<tun_var_list->count; i++) {
1054                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1055                         found = true;
1056                         break;
1057                 }
1058         }
1059
1060         if (! found) {
1061                 printf("No such tunable %s\n", argv[0]);
1062                 return 1;
1063         }
1064
1065         tunable.name = argv[0];
1066         tunable.value = strtoul(argv[1], NULL, 0);
1067
1068         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1069                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1070         if (ret != 0) {
1071                 if (ret == 1) {
1072                         fprintf(stderr,
1073                                 "Setting obsolete tunable variable '%s'\n",
1074                                tunable.name);
1075                         return 0;
1076                 }
1077         }
1078
1079         return ret;
1080 }
1081
1082 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1083                             int argc, const char **argv)
1084 {
1085         struct ctdb_var_list *tun_var_list;
1086         int ret, i;
1087
1088         if (argc != 0) {
1089                 usage("listvars");
1090         }
1091
1092         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1093                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1094         if (ret != 0) {
1095                 return ret;
1096         }
1097
1098         for (i=0; i<tun_var_list->count; i++) {
1099                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1100         }
1101
1102         return 0;
1103 }
1104
1105 const struct {
1106         const char *name;
1107         uint32_t offset;
1108 } stats_fields[] = {
1109 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1110         STATISTICS_FIELD(num_clients),
1111         STATISTICS_FIELD(frozen),
1112         STATISTICS_FIELD(recovering),
1113         STATISTICS_FIELD(num_recoveries),
1114         STATISTICS_FIELD(client_packets_sent),
1115         STATISTICS_FIELD(client_packets_recv),
1116         STATISTICS_FIELD(node_packets_sent),
1117         STATISTICS_FIELD(node_packets_recv),
1118         STATISTICS_FIELD(keepalive_packets_sent),
1119         STATISTICS_FIELD(keepalive_packets_recv),
1120         STATISTICS_FIELD(node.req_call),
1121         STATISTICS_FIELD(node.reply_call),
1122         STATISTICS_FIELD(node.req_dmaster),
1123         STATISTICS_FIELD(node.reply_dmaster),
1124         STATISTICS_FIELD(node.reply_error),
1125         STATISTICS_FIELD(node.req_message),
1126         STATISTICS_FIELD(node.req_control),
1127         STATISTICS_FIELD(node.reply_control),
1128         STATISTICS_FIELD(client.req_call),
1129         STATISTICS_FIELD(client.req_message),
1130         STATISTICS_FIELD(client.req_control),
1131         STATISTICS_FIELD(timeouts.call),
1132         STATISTICS_FIELD(timeouts.control),
1133         STATISTICS_FIELD(timeouts.traverse),
1134         STATISTICS_FIELD(locks.num_calls),
1135         STATISTICS_FIELD(locks.num_current),
1136         STATISTICS_FIELD(locks.num_pending),
1137         STATISTICS_FIELD(locks.num_failed),
1138         STATISTICS_FIELD(total_calls),
1139         STATISTICS_FIELD(pending_calls),
1140         STATISTICS_FIELD(childwrite_calls),
1141         STATISTICS_FIELD(pending_childwrite_calls),
1142         STATISTICS_FIELD(memory_used),
1143         STATISTICS_FIELD(max_hop_count),
1144         STATISTICS_FIELD(total_ro_delegations),
1145         STATISTICS_FIELD(total_ro_revokes),
1146 };
1147
1148 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1149
1150 static void print_statistics_machine(struct ctdb_statistics *s,
1151                                      bool show_header)
1152 {
1153         int i;
1154
1155         if (show_header) {
1156                 printf("CTDB version%s", options.sep);
1157                 printf("Current time of statistics%s", options.sep);
1158                 printf("Statistics collected since%s", options.sep);
1159                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1160                         printf("%s%s", stats_fields[i].name, options.sep);
1161                 }
1162                 printf("num_reclock_ctdbd_latency%s", options.sep);
1163                 printf("min_reclock_ctdbd_latency%s", options.sep);
1164                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1165                 printf("max_reclock_ctdbd_latency%s", options.sep);
1166
1167                 printf("num_reclock_recd_latency%s", options.sep);
1168                 printf("min_reclock_recd_latency%s", options.sep);
1169                 printf("avg_reclock_recd_latency%s", options.sep);
1170                 printf("max_reclock_recd_latency%s", options.sep);
1171
1172                 printf("num_call_latency%s", options.sep);
1173                 printf("min_call_latency%s", options.sep);
1174                 printf("avg_call_latency%s", options.sep);
1175                 printf("max_call_latency%s", options.sep);
1176
1177                 printf("num_lockwait_latency%s", options.sep);
1178                 printf("min_lockwait_latency%s", options.sep);
1179                 printf("avg_lockwait_latency%s", options.sep);
1180                 printf("max_lockwait_latency%s", options.sep);
1181
1182                 printf("num_childwrite_latency%s", options.sep);
1183                 printf("min_childwrite_latency%s", options.sep);
1184                 printf("avg_childwrite_latency%s", options.sep);
1185                 printf("max_childwrite_latency%s", options.sep);
1186                 printf("\n");
1187         }
1188
1189         printf("%u%s", CTDB_PROTOCOL, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1191         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1192         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1193                 printf("%u%s",
1194                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1195                        options.sep);
1196         }
1197         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1198         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1199         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1200         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1201
1202         printf("%u%s", s->reclock.recd.num, options.sep);
1203         printf("%.6f%s", s->reclock.recd.min, options.sep);
1204         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1205         printf("%.6f%s", s->reclock.recd.max, options.sep);
1206
1207         printf("%d%s", s->call_latency.num, options.sep);
1208         printf("%.6f%s", s->call_latency.min, options.sep);
1209         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1210         printf("%.6f%s", s->call_latency.max, options.sep);
1211
1212         printf("%d%s", s->childwrite_latency.num, options.sep);
1213         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1214         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1215         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1216         printf("\n");
1217 }
1218
1219 static void print_statistics(struct ctdb_statistics *s)
1220 {
1221         int tmp, days, hours, minutes, seconds;
1222         int i;
1223         const char *prefix = NULL;
1224         int preflen = 0;
1225
1226         tmp = s->statistics_current_time.tv_sec -
1227               s->statistics_start_time.tv_sec;
1228         seconds = tmp % 60; tmp /= 60;
1229         minutes = tmp % 60; tmp /= 60;
1230         hours   = tmp % 24; tmp /= 24;
1231         days    = tmp;
1232
1233         printf("CTDB version %u\n", CTDB_PROTOCOL);
1234         printf("Current time of statistics  :                %s",
1235                ctime(&s->statistics_current_time.tv_sec));
1236         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1237                days, hours, minutes, seconds,
1238                ctime(&s->statistics_start_time.tv_sec));
1239
1240         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1241                 if (strchr(stats_fields[i].name, '.') != NULL) {
1242                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1243                         if (! prefix ||
1244                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1245                                 prefix = stats_fields[i].name;
1246                                 printf(" %*.*s\n", preflen-1, preflen-1,
1247                                        stats_fields[i].name);
1248                         }
1249                 } else {
1250                         preflen = 0;
1251                 }
1252                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1253                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1254                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1255         }
1256
1257         printf(" hop_count_buckets:");
1258         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1259                 printf(" %d", s->hop_count_bucket[i]);
1260         }
1261         printf("\n");
1262         printf(" lock_buckets:");
1263         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1264                 printf(" %d", s->locks.buckets[i]);
1265         }
1266         printf("\n");
1267         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1268                "locks_latency      MIN/AVG/MAX",
1269                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1270                s->locks.latency.max, s->locks.latency.num);
1271
1272         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1273                "reclock_ctdbd      MIN/AVG/MAX",
1274                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1275                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1276
1277         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1278                "reclock_recd       MIN/AVG/MAX",
1279                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1280                s->reclock.recd.max, s->reclock.recd.num);
1281
1282         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1283                "call_latency       MIN/AVG/MAX",
1284                s->call_latency.min, LATENCY_AVG(s->call_latency),
1285                s->call_latency.max, s->call_latency.num);
1286
1287         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1288                "childwrite_latency MIN/AVG/MAX",
1289                s->childwrite_latency.min,
1290                LATENCY_AVG(s->childwrite_latency),
1291                s->childwrite_latency.max, s->childwrite_latency.num);
1292 }
1293
1294 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1295                               int argc, const char **argv)
1296 {
1297         struct ctdb_statistics *stats;
1298         int ret;
1299
1300         if (argc != 0) {
1301                 usage("statistics");
1302         }
1303
1304         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1305                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1306         if (ret != 0) {
1307                 return ret;
1308         }
1309
1310         if (options.machinereadable) {
1311                 print_statistics_machine(stats, true);
1312         } else {
1313                 print_statistics(stats);
1314         }
1315
1316         return 0;
1317 }
1318
1319 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1320                                     struct ctdb_context *ctdb,
1321                                     int argc, const char **argv)
1322 {
1323         int ret;
1324
1325         if (argc != 0) {
1326                 usage("statisticsreset");
1327         }
1328
1329         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1330                                          ctdb->cmd_pnn, TIMEOUT());
1331         if (ret != 0) {
1332                 return ret;
1333         }
1334
1335         return 0;
1336 }
1337
1338 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1339                          int argc, const char **argv)
1340 {
1341         struct ctdb_statistics_list *slist;
1342         int ret, count = 0, i;
1343         bool show_header = true;
1344
1345         if (argc > 1) {
1346                 usage("stats");
1347         }
1348
1349         if (argc == 1) {
1350                 count = atoi(argv[0]);
1351         }
1352
1353         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1354                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1355         if (ret != 0) {
1356                 return ret;
1357         }
1358
1359         for (i=0; i<slist->num; i++) {
1360                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1361                         continue;
1362                 }
1363                 if (options.machinereadable == 1) {
1364                         print_statistics_machine(&slist->stats[i],
1365                                                  show_header);
1366                         show_header = false;
1367                 } else {
1368                         print_statistics(&slist->stats[i]);
1369                 }
1370                 if (count > 0 && i == count) {
1371                         break;
1372                 }
1373         }
1374
1375         return 0;
1376 }
1377
1378 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1379                      struct ctdb_public_ip_list *ips,
1380                      struct ctdb_public_ip_info **ipinfo,
1381                      bool all_nodes)
1382 {
1383         int i, j;
1384         char *conf, *avail, *active;
1385
1386         if (options.machinereadable == 1) {
1387                 printf("%s%s%s%s%s", options.sep,
1388                        "Public IP", options.sep,
1389                        "Node", options.sep);
1390                 if (options.verbose == 1) {
1391                         printf("%s%s%s%s%s%s\n",
1392                                "ActiveInterfaces", options.sep,
1393                                "AvailableInterfaces", options.sep,
1394                                "ConfiguredInterfaces", options.sep);
1395                 } else {
1396                         printf("\n");
1397                 }
1398         } else {
1399                 if (all_nodes) {
1400                         printf("Public IPs on ALL nodes\n");
1401                 } else {
1402                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1403                 }
1404         }
1405
1406         /* IPs are reverse sorted */
1407         for (i=ips->num-1; i>=0; i--) {
1408
1409                 if (options.machinereadable == 1) {
1410                         printf("%s%s%s%d%s", options.sep,
1411                                ctdb_sock_addr_to_string(
1412                                         mem_ctx, &ips->ip[i].addr),
1413                                options.sep,
1414                                (int)ips->ip[i].pnn, options.sep);
1415                 } else {
1416                         printf("%s", ctdb_sock_addr_to_string(
1417                                                 mem_ctx, &ips->ip[i].addr));
1418                 }
1419
1420                 if (options.verbose == 0) {
1421                         if (options.machinereadable == 1) {
1422                                 printf("\n");
1423                         } else {
1424                                 printf(" %d\n", (int)ips->ip[i].pnn);
1425                         }
1426                         continue;
1427                 }
1428
1429                 conf = NULL;
1430                 avail = NULL;
1431                 active = NULL;
1432
1433                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1434                         struct ctdb_iface *iface;
1435
1436                         iface = &ipinfo[i]->ifaces->iface[j];
1437                         if (conf == NULL) {
1438                                 conf = talloc_strdup(mem_ctx, iface->name);
1439                         } else {
1440                                 conf = talloc_asprintf_append(
1441                                                 mem_ctx, ",%s", iface->name);
1442                         }
1443
1444                         if (ipinfo[i]->active_idx == j) {
1445                                 active = iface->name;
1446                         }
1447
1448                         if (iface->link_state == 0) {
1449                                 continue;
1450                         }
1451
1452                         if (avail == NULL) {
1453                                 avail = talloc_strdup(mem_ctx, iface->name);
1454                         } else {
1455                                 avail = talloc_asprintf_append(
1456                                                 mem_ctx, ",%s", iface->name);
1457                         }
1458                 }
1459
1460                 if (options.machinereadable == 1) {
1461                         printf("%s%s%s%s%s%s\n",
1462                                active ? active : "", options.sep,
1463                                avail ? avail : "", options.sep,
1464                                conf ? conf : "", options.sep);
1465                 } else {
1466                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1467                                ips->ip[i].pnn, active ? active : "",
1468                                avail ? avail : "", conf ? conf : "");
1469                 }
1470         }
1471 }
1472
1473 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1474                        size_t datalen, void *private_data)
1475 {
1476         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1477                 private_data, struct ctdb_public_ip_list);
1478         struct ctdb_public_ip *ip;
1479
1480         ip = (struct ctdb_public_ip *)databuf;
1481         ips->ip[ips->num] = *ip;
1482         ips->num += 1;
1483
1484         return 0;
1485 }
1486
1487 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1488                               struct ctdb_public_ip_list **out)
1489 {
1490         struct ctdb_node_map *nodemap;
1491         struct ctdb_public_ip_list *ips;
1492         struct db_hash_context *ipdb;
1493         uint32_t *pnn_list;
1494         int ret, count, i, j;
1495
1496         nodemap = get_nodemap(ctdb, false);
1497         if (nodemap == NULL) {
1498                 return 1;
1499         }
1500
1501         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1502         if (ret != 0) {
1503                 goto failed;
1504         }
1505
1506         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1507                                      &pnn_list);
1508         if (count <= 0) {
1509                 goto failed;
1510         }
1511
1512         for (i=0; i<count; i++) {
1513                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1514                                                pnn_list[i], TIMEOUT(), &ips);
1515                 if (ret != 0) {
1516                         goto failed;
1517                 }
1518
1519                 for (j=0; j<ips->num; j++) {
1520                         struct ctdb_public_ip ip;
1521
1522                         ip.pnn = ips->ip[j].pnn;
1523                         ip.addr = ips->ip[j].addr;
1524
1525                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1526                                           sizeof(ip.addr),
1527                                           (uint8_t *)&ip, sizeof(ip));
1528                         if (ret != 0) {
1529                                 goto failed;
1530                         }
1531                 }
1532
1533                 TALLOC_FREE(ips);
1534         }
1535
1536         talloc_free(pnn_list);
1537
1538         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1539         if (ret != 0) {
1540                 goto failed;
1541         }
1542
1543         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1544         if (ips == NULL) {
1545                 goto failed;
1546         }
1547
1548         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1549         if (ips->ip == NULL) {
1550                 goto failed;
1551         }
1552
1553         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1554         if (ret != 0) {
1555                 goto failed;
1556         }
1557
1558         if (count != ips->num) {
1559                 goto failed;
1560         }
1561
1562         talloc_free(ipdb);
1563
1564         *out = ips;
1565         return 0;
1566
1567 failed:
1568         talloc_free(ipdb);
1569         return 1;
1570 }
1571
1572 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1573                       int argc, const char **argv)
1574 {
1575         struct ctdb_public_ip_list *ips;
1576         struct ctdb_public_ip_info **ipinfo;
1577         int ret, i;
1578         bool do_all = false;
1579
1580         if (argc > 1) {
1581                 usage("ip");
1582         }
1583
1584         if (argc == 1) {
1585                 if (strcmp(argv[0], "all") == 0) {
1586                         do_all = true;
1587                 } else {
1588                         usage("ip");
1589                 }
1590         }
1591
1592         if (do_all) {
1593                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1594         } else {
1595                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1596                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1597         }
1598         if (ret != 0) {
1599                 return ret;
1600         }
1601
1602         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1603         if (ipinfo == NULL) {
1604                 return 1;
1605         }
1606
1607         for (i=0; i<ips->num; i++) {
1608                 uint32_t pnn;
1609                 if (do_all) {
1610                         pnn = ips->ip[i].pnn;
1611                 } else {
1612                         pnn = ctdb->cmd_pnn;
1613                 }
1614                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1615                                                    ctdb->client, pnn,
1616                                                    TIMEOUT(), &ips->ip[i].addr,
1617                                                    &ipinfo[i]);
1618                 if (ret != 0) {
1619                         return ret;
1620                 }
1621         }
1622
1623         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1624         return 0;
1625 }
1626
1627 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1628                           int argc, const char **argv)
1629 {
1630         struct ctdb_public_ip_info *ipinfo;
1631         ctdb_sock_addr addr;
1632         int ret, i;
1633
1634         if (argc != 1) {
1635                 usage("ipinfo");
1636         }
1637
1638         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1639                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1640                 return 1;
1641         }
1642
1643         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1644                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1645                                            &ipinfo);
1646         if (ret != 0) {
1647                 if (ret == -1) {
1648                         printf("Node %u does not know about IP %s\n",
1649                                ctdb->cmd_pnn, argv[0]);
1650                 }
1651                 return ret;
1652         }
1653
1654         printf("Public IP[%s] info on node %u\n",
1655                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1656                                         ctdb->cmd_pnn);
1657
1658         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1659                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1660                ipinfo->ip.pnn, ipinfo->ifaces->num);
1661
1662         for (i=0; i<ipinfo->ifaces->num; i++) {
1663                 struct ctdb_iface *iface;
1664
1665                 iface = &ipinfo->ifaces->iface[i];
1666                 iface->name[CTDB_IFACE_SIZE] = '\0';
1667                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1668                        i+1, iface->name,
1669                        iface->link_state == 0 ? "down" : "up",
1670                        iface->references,
1671                        (i == ipinfo->active_idx) ? " (active)" : "");
1672         }
1673
1674         return 0;
1675 }
1676
1677 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1678                           int argc, const char **argv)
1679 {
1680         struct ctdb_iface_list *ifaces;
1681         int ret, i;
1682
1683         if (argc != 0) {
1684                 usage("ifaces");
1685         }
1686
1687         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1688                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1689         if (ret != 0) {
1690                 return ret;
1691         }
1692
1693         if (ifaces->num == 0) {
1694                 printf("No interfaces configured on node %u\n",
1695                        ctdb->cmd_pnn);
1696                 return 0;
1697         }
1698
1699         if (options.machinereadable) {
1700                 printf("%s%s%s%s%s%s%s\n", options.sep,
1701                        "Name", options.sep,
1702                        "LinkStatus", options.sep,
1703                        "References", options.sep);
1704         } else {
1705                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1706         }
1707
1708         for (i=0; i<ifaces->num; i++) {
1709                 if (options.machinereadable) {
1710                         printf("%s%s%s%u%s%u%s\n", options.sep,
1711                                ifaces->iface[i].name, options.sep,
1712                                ifaces->iface[i].link_state, options.sep,
1713                                ifaces->iface[i].references, options.sep);
1714                 } else {
1715                         printf("name:%s link:%s references:%u\n",
1716                                ifaces->iface[i].name,
1717                                ifaces->iface[i].link_state ? "up" : "down",
1718                                ifaces->iface[i].references);
1719                 }
1720         }
1721
1722         return 0;
1723 }
1724
1725 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1726                                 int argc, const char **argv)
1727 {
1728         struct ctdb_iface_list *ifaces;
1729         struct ctdb_iface *iface;
1730         int ret, i;
1731
1732         if (argc != 2) {
1733                 usage("setifacelink");
1734         }
1735
1736         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1737                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1738                 return 1;
1739         }
1740
1741         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1742                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1743         if (ret != 0) {
1744                 fprintf(stderr,
1745                         "Failed to get interface information from node %u\n",
1746                         ctdb->cmd_pnn);
1747                 return ret;
1748         }
1749
1750         iface = NULL;
1751         for (i=0; i<ifaces->num; i++) {
1752                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1753                         iface = &ifaces->iface[i];
1754                         break;
1755                 }
1756         }
1757
1758         if (iface == NULL) {
1759                 printf("Interface %s not configured on node %u\n",
1760                        argv[0], ctdb->cmd_pnn);
1761                 return 1;
1762         }
1763
1764         if (strcmp(argv[1], "up") == 0) {
1765                 iface->link_state = 1;
1766         } else if (strcmp(argv[1], "down") == 0) {
1767                 iface->link_state = 0;
1768         } else {
1769                 usage("setifacelink");
1770                 return 1;
1771         }
1772
1773         iface->references = 0;
1774
1775         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1776                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1777         if (ret != 0) {
1778                 return ret;
1779         }
1780
1781         return 0;
1782 }
1783
1784 static int control_process_exists(TALLOC_CTX *mem_ctx,
1785                                   struct ctdb_context *ctdb,
1786                                   int argc, const char **argv)
1787 {
1788         pid_t pid;
1789         int ret, status;
1790
1791         if (argc != 1) {
1792                 usage("process-exists");
1793         }
1794
1795         pid = atoi(argv[0]);
1796         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1797                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1798         if (ret != 0) {
1799                 return ret;
1800         }
1801
1802         if (status == 0) {
1803                 printf("PID %u exists\n", pid);
1804         } else {
1805                 printf("PID %u does not exist\n", pid);
1806         }
1807         return status;
1808 }
1809
1810 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1811                             int argc, const char **argv)
1812 {
1813         struct ctdb_dbid_map *dbmap;
1814         int ret, i;
1815
1816         if (argc != 0) {
1817                 usage("getdbmap");
1818         }
1819
1820         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1821                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1822         if (ret != 0) {
1823                 return ret;
1824         }
1825
1826         if (options.machinereadable == 1) {
1827                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1828                        options.sep,
1829                        "ID", options.sep,
1830                        "Name", options.sep,
1831                        "Path", options.sep,
1832                        "Persistent", options.sep,
1833                        "Sticky", options.sep,
1834                        "Unhealthy", options.sep,
1835                        "Readonly", options.sep);
1836         } else {
1837                 printf("Number of databases:%d\n", dbmap->num);
1838         }
1839
1840         for (i=0; i<dbmap->num; i++) {
1841                 const char *name;
1842                 const char *path;
1843                 const char *health;
1844                 bool persistent;
1845                 bool readonly;
1846                 bool sticky;
1847                 uint32_t db_id;
1848
1849                 db_id = dbmap->dbs[i].db_id;
1850
1851                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1852                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1853                                            &name);
1854                 if (ret != 0) {
1855                         return ret;
1856                 }
1857
1858                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1859                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1860                                           &path);
1861                 if (ret != 0) {
1862                         return ret;
1863                 }
1864
1865                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1866                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1867                                               &health);
1868                 if (ret != 0) {
1869                         return ret;
1870                 }
1871
1872                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1873                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1874                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1875
1876                 if (options.machinereadable == 1) {
1877                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1878                                options.sep,
1879                                db_id, options.sep,
1880                                name, options.sep,
1881                                path, options.sep,
1882                                !! (persistent), options.sep,
1883                                !! (sticky), options.sep,
1884                                !! (health), options.sep,
1885                                !! (readonly), options.sep);
1886                 } else {
1887                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1888                                db_id, name, path,
1889                                persistent ? " PERSISTENT" : "",
1890                                sticky ? " STICKY" : "",
1891                                readonly ? " READONLY" : "",
1892                                health ? " UNHEALTHY" : "");
1893                 }
1894
1895                 talloc_free(discard_const(name));
1896                 talloc_free(discard_const(path));
1897                 talloc_free(discard_const(health));
1898         }
1899
1900         return 0;
1901 }
1902
1903 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1904                                int argc, const char **argv)
1905 {
1906         uint32_t db_id;
1907         const char *db_name, *db_path, *db_health;
1908         uint8_t db_flags;
1909         int ret;
1910
1911         if (argc != 1) {
1912                 usage("getdbstatus");
1913         }
1914
1915         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1916                 return 1;
1917         }
1918
1919         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1920                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1921                                   &db_path);
1922         if (ret != 0) {
1923                 return ret;
1924         }
1925
1926         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1927                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1928                                       &db_health);
1929         if (ret != 0) {
1930                 return ret;
1931         }
1932
1933         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1934         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1935                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1936                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1937                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1938                (db_health ? db_health : "OK"));
1939         return 0;
1940 }
1941
1942 struct dump_record_state {
1943         uint32_t count;
1944 };
1945
1946 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1947
1948 static void dump_tdb_data(const char *name, TDB_DATA val)
1949 {
1950         int i;
1951
1952         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1953         for (i=0; i<val.dsize; i++) {
1954                 if (ISASCII(val.dptr[i])) {
1955                         fprintf(stdout, "%c", val.dptr[i]);
1956                 } else {
1957                         fprintf(stdout, "\\%02X", val.dptr[i]);
1958                 }
1959         }
1960         fprintf(stdout, "\"\n");
1961 }
1962
1963 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1964 {
1965         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1966         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1967         fprintf(stdout, "flags: 0x%08x", header->flags);
1968         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1969                 fprintf(stdout, " MIGRATED_WITH_DATA");
1970         }
1971         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1972                 fprintf(stdout, " VACUUM_MIGRATED");
1973         }
1974         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1975                 fprintf(stdout, " AUTOMATIC");
1976         }
1977         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1978                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1979         }
1980         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
1981                 fprintf(stdout, " RO_HAVE_READONLY");
1982         }
1983         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
1984                 fprintf(stdout, " RO_REVOKING_READONLY");
1985         }
1986         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
1987                 fprintf(stdout, " RO_REVOKE_COMPLETE");
1988         }
1989         fprintf(stdout, "\n");
1990
1991 }
1992
1993 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
1994                        TDB_DATA key, TDB_DATA data, void *private_data)
1995 {
1996         struct dump_record_state *state =
1997                 (struct dump_record_state *)private_data;
1998
1999         state->count += 1;
2000
2001         dump_tdb_data("key", key);
2002         dump_ltdb_header(header);
2003         dump_tdb_data("data", data);
2004         fprintf(stdout, "\n");
2005
2006         return 0;
2007 }
2008
2009 struct traverse_state {
2010         TALLOC_CTX *mem_ctx;
2011         bool done;
2012         ctdb_rec_parser_func_t func;
2013         struct dump_record_state sub_state;
2014 };
2015
2016 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2017 {
2018         struct traverse_state *state = (struct traverse_state *)private_data;
2019         struct ctdb_rec_data *rec;
2020         struct ctdb_ltdb_header header;
2021         int ret;
2022
2023         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2024         if (ret != 0) {
2025                 return;
2026         }
2027
2028         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2029                 talloc_free(rec);
2030                 /* end of traverse */
2031                 state->done = true;
2032                 return;
2033         }
2034
2035         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2036         if (ret != 0) {
2037                 talloc_free(rec);
2038                 return;
2039         }
2040
2041         if (rec->data.dsize == 0) {
2042                 talloc_free(rec);
2043                 return;
2044         }
2045
2046         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2047                           &state->sub_state);
2048         talloc_free(rec);
2049         if (ret != 0) {
2050                 state->done = true;
2051         }
2052 }
2053
2054 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2055                          int argc, const char **argv)
2056 {
2057         struct ctdb_db_context *db;
2058         const char *db_name;
2059         uint32_t db_id;
2060         uint8_t db_flags;
2061         struct ctdb_traverse_start_ext traverse;
2062         struct traverse_state state;
2063         int ret;
2064
2065         if (argc != 1) {
2066                 usage("catdb");
2067         }
2068
2069         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2070                 return 1;
2071         }
2072
2073         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2074                           db_flags, &db);
2075         if (ret != 0) {
2076                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2077                 return ret;
2078         }
2079
2080         /* Valgrind fix */
2081         ZERO_STRUCT(traverse);
2082
2083         traverse.db_id = db_id;
2084         traverse.reqid = 0;
2085         traverse.srvid = next_srvid(ctdb);
2086         traverse.withemptyrecords = false;
2087
2088         state.mem_ctx = mem_ctx;
2089         state.done = false;
2090         state.func = dump_record;
2091         state.sub_state.count = 0;
2092
2093         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2094                                               traverse.srvid,
2095                                               traverse_handler, &state);
2096         if (ret != 0) {
2097                 return ret;
2098         }
2099
2100         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2101                                            ctdb->cmd_pnn, TIMEOUT(),
2102                                            &traverse);
2103         if (ret != 0) {
2104                 return ret;
2105         }
2106
2107         ctdb_client_wait(ctdb->ev, &state.done);
2108
2109         printf("Dumped %u records\n", state.sub_state.count);
2110
2111         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2112                                                  traverse.srvid, &state);
2113         if (ret != 0) {
2114                 return ret;
2115         }
2116
2117         return 0;
2118 }
2119
2120 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2121                           int argc, const char **argv)
2122 {
2123         struct ctdb_db_context *db;
2124         const char *db_name;
2125         uint32_t db_id;
2126         uint8_t db_flags;
2127         struct dump_record_state state;
2128         int ret;
2129
2130         if (argc != 1) {
2131                 usage("catdb");
2132         }
2133
2134         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2135                 return 1;
2136         }
2137
2138         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2139                           db_flags, &db);
2140         if (ret != 0) {
2141                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2142                 return ret;
2143         }
2144
2145         state.count = 0;
2146         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2147
2148         printf("Dumped %u record(s)\n", state.count);
2149
2150         return ret;
2151 }
2152
2153 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2154                               int argc, const char **argv)
2155 {
2156         int mode, ret;
2157
2158         if (argc != 0) {
2159                 usage("getmonmode");
2160         }
2161
2162         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2163                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2164         if (ret != 0) {
2165                 return ret;
2166         }
2167
2168         printf("%s\n",
2169                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2170         return 0;
2171 }
2172
2173 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2174                                    struct ctdb_context *ctdb,
2175                                    int argc, const char **argv)
2176 {
2177         uint32_t caps;
2178         int ret;
2179
2180         if (argc != 0) {
2181                 usage("getcapabilities");
2182         }
2183
2184         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2185                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2186         if (ret != 0) {
2187                 return ret;
2188         }
2189
2190         if (options.machinereadable == 1) {
2191                 printf("%s%s%s%s%s\n",
2192                        options.sep,
2193                        "RECMASTER", options.sep,
2194                        "LMASTER", options.sep);
2195                 printf("%s%d%s%d%s\n", options.sep,
2196                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2197                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2198         } else {
2199                 printf("RECMASTER: %s\n",
2200                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2201                 printf("LMASTER: %s\n",
2202                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2203         }
2204
2205         return 0;
2206 }
2207
2208 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2209                        int argc, const char **argv)
2210 {
2211         printf("%u\n", ctdb_client_pnn(ctdb->client));
2212         return 0;
2213 }
2214
2215 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2216                        int argc, const char **argv)
2217 {
2218         char *t, *lvs_helper = NULL;
2219
2220         if (argc != 1) {
2221                 usage("lvs");
2222         }
2223
2224         t = getenv("CTDB_LVS_HELPER");
2225         if (t != NULL) {
2226                 lvs_helper = talloc_strdup(mem_ctx, t);
2227         } else {
2228                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2229                                              CTDB_HELPER_BINDIR);
2230         }
2231
2232         if (lvs_helper == NULL) {
2233                 fprintf(stderr, "Unable to set LVS helper\n");
2234                 return 1;
2235         }
2236
2237         return run_helper("LVS helper", lvs_helper, argv[0]);
2238 }
2239
2240 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2241                                    struct ctdb_context *ctdb,
2242                                    int argc, const char **argv)
2243 {
2244         int ret;
2245
2246         if (argc != 0) {
2247                 usage("disablemonitor");
2248         }
2249
2250         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2251                                         ctdb->cmd_pnn, TIMEOUT());
2252         if (ret != 0) {
2253                 return ret;
2254         }
2255
2256         return 0;
2257 }
2258
2259 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2260                                   struct ctdb_context *ctdb,
2261                                   int argc, const char **argv)
2262 {
2263         int ret;
2264
2265         if (argc != 0) {
2266                 usage("enablemonitor");
2267         }
2268
2269         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2270                                        ctdb->cmd_pnn, TIMEOUT());
2271         if (ret != 0) {
2272                 return ret;
2273         }
2274
2275         return 0;
2276 }
2277
2278 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2279                             int argc, const char **argv)
2280 {
2281         int log_level;
2282         int ret;
2283         bool found;
2284
2285         if (argc != 1) {
2286                 usage("setdebug");
2287         }
2288
2289         found = debug_level_parse(argv[0], &log_level);
2290         if (! found) {
2291                 fprintf(stderr,
2292                         "Invalid debug level '%s'. Valid levels are:\n",
2293                         argv[0]);
2294                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2295                 return 1;
2296         }
2297
2298         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2299                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2300         if (ret != 0) {
2301                 return ret;
2302         }
2303
2304         return 0;
2305 }
2306
2307 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2308                             int argc, const char **argv)
2309 {
2310         int loglevel;
2311         const char *log_str;
2312         int ret;
2313
2314         if (argc != 0) {
2315                 usage("getdebug");
2316         }
2317
2318         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2319                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2320         if (ret != 0) {
2321                 return ret;
2322         }
2323
2324         log_str = debug_level_to_string(loglevel);
2325         printf("%s\n", log_str);
2326
2327         return 0;
2328 }
2329
2330 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2331                           int argc, const char **argv)
2332 {
2333         const char *db_name;
2334         uint8_t db_flags = 0;
2335         int ret;
2336
2337         if (argc < 1 || argc > 2) {
2338                 usage("attach");
2339         }
2340
2341         db_name = argv[0];
2342         if (argc == 2) {
2343                 if (strcmp(argv[1], "persistent") == 0) {
2344                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2345                 } else if (strcmp(argv[1], "readonly") == 0) {
2346                         db_flags = CTDB_DB_FLAGS_READONLY;
2347                 } else if (strcmp(argv[1], "sticky") == 0) {
2348                         db_flags = CTDB_DB_FLAGS_STICKY;
2349                 } else {
2350                         usage("attach");
2351                 }
2352         }
2353
2354         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2355                           db_flags, NULL);
2356         if (ret != 0) {
2357                 return ret;
2358         }
2359
2360         return 0;
2361 }
2362
2363 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2364                           int argc, const char **argv)
2365 {
2366         const char *db_name;
2367         uint32_t db_id;
2368         uint8_t db_flags;
2369         struct ctdb_node_map *nodemap;
2370         int recmode;
2371         int ret, ret2, i;
2372
2373         if (argc < 1) {
2374                 usage("detach");
2375         }
2376
2377         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2378                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2379         if (ret != 0) {
2380                 return ret;
2381         }
2382
2383         if (recmode == CTDB_RECOVERY_ACTIVE) {
2384                 fprintf(stderr, "Database cannot be detached"
2385                                 " when recovery is active\n");
2386                 return 1;
2387         }
2388
2389         nodemap = get_nodemap(ctdb, false);
2390         if (nodemap == NULL) {
2391                 return 1;
2392         }
2393
2394         for (i=0; i<nodemap->num; i++) {
2395                 uint32_t value;
2396
2397                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2398                         continue;
2399                 }
2400                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2401                         continue;
2402                 }
2403                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2404                         fprintf(stderr, "Database cannot be detached on"
2405                                 " inactive (stopped or banned) node %u\n",
2406                                 nodemap->node[i].pnn);
2407                         return 1;
2408                 }
2409
2410                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2411                                             nodemap->node[i].pnn, TIMEOUT(),
2412                                             "AllowClientDBAttach", &value);
2413                 if (ret != 0) {
2414                         fprintf(stderr,
2415                                 "Unable to get tunable AllowClientDBAttach"
2416                                 " from node %u\n", nodemap->node[i].pnn);
2417                         return ret;
2418                 }
2419
2420                 if (value == 1) {
2421                         fprintf(stderr,
2422                                 "Database access is still active on node %u."
2423                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2424                                 nodemap->node[i].pnn);
2425                         return 1;
2426                 }
2427         }
2428
2429         ret2 = 0;
2430         for (i=0; i<argc; i++) {
2431                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2432                                 &db_flags)) {
2433                         continue;
2434                 }
2435
2436                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2437                         fprintf(stderr,
2438                                 "Persistent database %s cannot be detached\n",
2439                                 argv[0]);
2440                         return 1;
2441                 }
2442
2443                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2444                                   TIMEOUT(), db_id);
2445                 if (ret != 0) {
2446                         fprintf(stderr, "Database %s detach failed\n", db_name);
2447                         ret2 = ret;
2448                 }
2449         }
2450
2451         return ret2;
2452 }
2453
2454 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2455                               int argc, const char **argv)
2456 {
2457         const char *mem_str;
2458         ssize_t n;
2459         int ret;
2460
2461         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2462                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2463         if (ret != 0) {
2464                 return ret;
2465         }
2466
2467         n = write(1, mem_str, strlen(mem_str)+1);
2468         if (n < 0 || n != strlen(mem_str)+1) {
2469                 fprintf(stderr, "Failed to write talloc summary\n");
2470                 return 1;
2471         }
2472
2473         return 0;
2474 }
2475
2476 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2477 {
2478         bool *done = (bool *)private_data;
2479         ssize_t n;
2480
2481         n = write(1, data.dptr, data.dsize);
2482         if (n < 0 || n != data.dsize) {
2483                 fprintf(stderr, "Failed to write talloc summary\n");
2484         }
2485
2486         *done = true;
2487 }
2488
2489 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2490                                 int argc, const char **argv)
2491 {
2492         struct ctdb_srvid_message msg = { 0 };
2493         int ret;
2494         bool done = false;
2495
2496         msg.pnn = ctdb->pnn;
2497         msg.srvid = next_srvid(ctdb);
2498
2499         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2500                                               msg.srvid, dump_memory, &done);
2501         if (ret != 0) {
2502                 return ret;
2503         }
2504
2505         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2506                                     ctdb->cmd_pnn, &msg);
2507         if (ret != 0) {
2508                 return ret;
2509         }
2510
2511         ctdb_client_wait(ctdb->ev, &done);
2512         return 0;
2513 }
2514
2515 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2516                           int argc, const char **argv)
2517 {
2518         pid_t pid;
2519         int ret;
2520
2521         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2522                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2523         if (ret != 0) {
2524                 return ret;
2525         }
2526
2527         printf("%u\n", pid);
2528         return 0;
2529 }
2530
2531 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2532                        const char *desc, uint32_t flag, bool set_flag)
2533 {
2534         struct ctdb_node_map *nodemap;
2535         bool flag_is_set;
2536
2537         nodemap = get_nodemap(ctdb, false);
2538         if (nodemap == NULL) {
2539                 return 1;
2540         }
2541
2542         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2543         if (set_flag == flag_is_set) {
2544                 if (set_flag) {
2545                         fprintf(stderr, "Node %u is already %s\n",
2546                                 ctdb->cmd_pnn, desc);
2547                 } else {
2548                         fprintf(stderr, "Node %u is not %s\n",
2549                                 ctdb->cmd_pnn, desc);
2550                 }
2551                 return 0;
2552         }
2553
2554         return 1;
2555 }
2556
2557 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2558                            uint32_t flag, bool set_flag)
2559 {
2560         struct ctdb_node_map *nodemap;
2561         bool flag_is_set;
2562
2563         while (1) {
2564                 nodemap = get_nodemap(ctdb, true);
2565                 if (nodemap == NULL) {
2566                         fprintf(stderr,
2567                                 "Failed to get nodemap, trying again\n");
2568                         sleep(1);
2569                         continue;
2570                 }
2571
2572                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2573                 if (flag_is_set == set_flag) {
2574                         break;
2575                 }
2576
2577                 sleep(1);
2578         }
2579 }
2580
2581 struct ipreallocate_state {
2582         int status;
2583         bool done;
2584 };
2585
2586 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2587                                  void *private_data)
2588 {
2589         struct ipreallocate_state *state =
2590                 (struct ipreallocate_state *)private_data;
2591
2592         if (data.dsize != sizeof(int)) {
2593                 /* Ignore packet */
2594                 return;
2595         }
2596
2597         state->status = *(int *)data.dptr;
2598         state->done = true;
2599 }
2600
2601 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2602 {
2603         struct ctdb_srvid_message msg = { 0 };
2604         struct ipreallocate_state state;
2605         int ret;
2606
2607         msg.pnn = ctdb->pnn;
2608         msg.srvid = next_srvid(ctdb);
2609
2610         state.done = false;
2611         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2612                                               msg.srvid,
2613                                               ipreallocate_handler, &state);
2614         if (ret != 0) {
2615                 return ret;
2616         }
2617
2618         while (true) {
2619                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2620                                                 ctdb->client,
2621                                                 CTDB_BROADCAST_CONNECTED,
2622                                                 &msg);
2623                 if (ret != 0) {
2624                         goto fail;
2625                 }
2626
2627                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2628                                                TIMEOUT());
2629                 if (ret != 0) {
2630                         continue;
2631                 }
2632
2633                 if (state.status >= 0) {
2634                         ret = 0;
2635                 } else {
2636                         ret = state.status;
2637                 }
2638                 break;
2639         }
2640
2641 fail:
2642         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2643                                            msg.srvid, &state);
2644         return ret;
2645 }
2646
2647 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2648                            int argc, const char **argv)
2649 {
2650         int ret;
2651
2652         if (argc != 0) {
2653                 usage("disable");
2654         }
2655
2656         ret = check_flags(mem_ctx, ctdb, "disabled",
2657                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2658         if (ret == 0) {
2659                 return 0;
2660         }
2661
2662         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2663                                  ctdb->cmd_pnn, TIMEOUT(),
2664                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2665         if (ret != 0) {
2666                 fprintf(stderr,
2667                         "Failed to set DISABLED flag on node %u\n",
2668                         ctdb->cmd_pnn);
2669                 return ret;
2670         }
2671
2672         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2673         return ipreallocate(mem_ctx, ctdb);
2674 }
2675
2676 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2677                           int argc, const char **argv)
2678 {
2679         int ret;
2680
2681         if (argc != 0) {
2682                 usage("enable");
2683         }
2684
2685         ret = check_flags(mem_ctx, ctdb, "disabled",
2686                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2687         if (ret == 0) {
2688                 return 0;
2689         }
2690
2691         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2692                                  ctdb->cmd_pnn, TIMEOUT(),
2693                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2694         if (ret != 0) {
2695                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2696                         ctdb->cmd_pnn);
2697                 return ret;
2698         }
2699
2700         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2701         return ipreallocate(mem_ctx, ctdb);
2702 }
2703
2704 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2705                         int argc, const char **argv)
2706 {
2707         int ret;
2708
2709         if (argc != 0) {
2710                 usage("stop");
2711         }
2712
2713         ret = check_flags(mem_ctx, ctdb, "stopped",
2714                           NODE_FLAGS_STOPPED, true);
2715         if (ret == 0) {
2716                 return 0;
2717         }
2718
2719         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2720                                   ctdb->cmd_pnn, TIMEOUT());
2721         if (ret != 0) {
2722                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2723                 return ret;
2724         }
2725
2726         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2727         return ipreallocate(mem_ctx, ctdb);
2728 }
2729
2730 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2731                             int argc, const char **argv)
2732 {
2733         int ret;
2734
2735         if (argc != 0) {
2736                 usage("continue");
2737         }
2738
2739         ret = check_flags(mem_ctx, ctdb, "stopped",
2740                           NODE_FLAGS_STOPPED, false);
2741         if (ret == 0) {
2742                 return 0;
2743         }
2744
2745         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2746                                       ctdb->cmd_pnn, TIMEOUT());
2747         if (ret != 0) {
2748                 fprintf(stderr, "Failed to continue stopped node %u\n",
2749                         ctdb->cmd_pnn);
2750                 return ret;
2751         }
2752
2753         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2754         return ipreallocate(mem_ctx, ctdb);
2755 }
2756
2757 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2758                        int argc, const char **argv)
2759 {
2760         struct ctdb_ban_state ban_state;
2761         int ret;
2762
2763         if (argc != 1) {
2764                 usage("ban");
2765         }
2766
2767         ret = check_flags(mem_ctx, ctdb, "banned",
2768                           NODE_FLAGS_BANNED, true);
2769         if (ret == 0) {
2770                 return 0;
2771         }
2772
2773         ban_state.pnn = ctdb->cmd_pnn;
2774         ban_state.time = strtoul(argv[0], NULL, 0);
2775
2776         if (ban_state.time == 0) {
2777                 fprintf(stderr, "Ban time cannot be zero\n");
2778                 return EINVAL;
2779         }
2780
2781         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2782                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2783         if (ret != 0) {
2784                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2785                 return ret;
2786         }
2787
2788         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2789         return ipreallocate(mem_ctx, ctdb);
2790
2791 }
2792
2793 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2794                          int argc, const char **argv)
2795 {
2796         struct ctdb_ban_state ban_state;
2797         int ret;
2798
2799         if (argc != 0) {
2800                 usage("unban");
2801         }
2802
2803         ret = check_flags(mem_ctx, ctdb, "banned",
2804                           NODE_FLAGS_BANNED, false);
2805         if (ret == 0) {
2806                 return 0;
2807         }
2808
2809         ban_state.pnn = ctdb->cmd_pnn;
2810         ban_state.time = 0;
2811
2812         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2813                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2814         if (ret != 0) {
2815                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2816                 return ret;
2817         }
2818
2819         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2820         return ipreallocate(mem_ctx, ctdb);
2821
2822 }
2823
2824 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2825                             int argc, const char **argv)
2826 {
2827         int ret;
2828
2829         if (argc != 0) {
2830                 usage("shutdown");
2831         }
2832
2833         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2834                                  ctdb->cmd_pnn, TIMEOUT());
2835         if (ret != 0) {
2836                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2837                 return ret;
2838         }
2839
2840         return 0;
2841 }
2842
2843 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2844                           uint32_t *generation)
2845 {
2846         uint32_t recmaster;
2847         int recmode;
2848         struct ctdb_vnn_map *vnnmap;
2849         int ret;
2850
2851 again:
2852         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2853                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2854         if (ret != 0) {
2855                 fprintf(stderr, "Failed to find recovery master\n");
2856                 return ret;
2857         }
2858
2859         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2860                                     recmaster, TIMEOUT(), &recmode);
2861         if (ret != 0) {
2862                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2863                         recmaster);
2864                 return ret;
2865         }
2866
2867         if (recmode == CTDB_RECOVERY_ACTIVE) {
2868                 sleep(1);
2869                 goto again;
2870         }
2871
2872         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2873                                   recmaster, TIMEOUT(), &vnnmap);
2874         if (ret != 0) {
2875                 fprintf(stderr, "Failed to get generation from node %u\n",
2876                         recmaster);
2877                 return ret;
2878         }
2879
2880         if (vnnmap->generation == INVALID_GENERATION) {
2881                 talloc_free(vnnmap);
2882                 sleep(1);
2883                 goto again;
2884         }
2885
2886         *generation = vnnmap->generation;
2887         talloc_free(vnnmap);
2888         return 0;
2889 }
2890
2891
2892 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2893                            int argc, const char **argv)
2894 {
2895         uint32_t generation, next_generation;
2896         int ret;
2897
2898         if (argc != 0) {
2899                 usage("recover");
2900         }
2901
2902         ret = get_generation(mem_ctx, ctdb, &generation);
2903         if (ret != 0) {
2904                 return ret;
2905         }
2906
2907         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2908                                     ctdb->cmd_pnn, TIMEOUT(),
2909                                     CTDB_RECOVERY_ACTIVE);
2910         if (ret != 0) {
2911                 fprintf(stderr, "Failed to set recovery mode active\n");
2912                 return ret;
2913         }
2914
2915         while (1) {
2916                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2917                 if (ret != 0) {
2918                         fprintf(stderr,
2919                                 "Failed to confirm end of recovery\n");
2920                         return ret;
2921                 }
2922
2923                 if (next_generation != generation) {
2924                         break;
2925                 }
2926
2927                 sleep (1);
2928         }
2929
2930         return 0;
2931 }
2932
2933 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2934                                 int argc, const char **argv)
2935 {
2936         if (argc != 0) {
2937                 usage("ipreallocate");
2938         }
2939
2940         return ipreallocate(mem_ctx, ctdb);
2941 }
2942
2943 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2944                                   struct ctdb_context *ctdb,
2945                                   int argc, const char **argv)
2946 {
2947         uint32_t recmaster;
2948         int ret;
2949
2950         if (argc != 0) {
2951                 usage("isnotrecmaster");
2952         }
2953
2954         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2955                                       ctdb->pnn, TIMEOUT(), &recmaster);
2956         if (ret != 0) {
2957                 fprintf(stderr, "Failed to get recmaster\n");
2958                 return ret;
2959         }
2960
2961         if (recmaster != ctdb->pnn) {
2962                 printf("this node is not the recmaster\n");
2963                 return 1;
2964         }
2965
2966         printf("this node is the recmaster\n");
2967         return 0;
2968 }
2969
2970 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2971                            int argc, const char **argv)
2972 {
2973         struct ctdb_addr_info addr_info;
2974         int ret;
2975
2976         if (argc != 2) {
2977                 usage("gratarp");
2978         }
2979
2980         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2981                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2982                 return 1;
2983         }
2984         addr_info.iface = argv[1];
2985
2986         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2987                                             ctdb->cmd_pnn, TIMEOUT(),
2988                                             &addr_info);
2989         if (ret != 0) {
2990                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2991                         ctdb->cmd_pnn);
2992                 return ret;
2993         }
2994
2995         return 0;
2996 }
2997
2998 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2999                            int argc, const char **argv)
3000 {
3001         ctdb_sock_addr src, dst;
3002         int ret;
3003
3004         if (argc != 0 && argc != 2) {
3005                 usage("tickle");
3006         }
3007
3008         if (argc == 0) {
3009                 struct ctdb_connection *clist;
3010                 int count;
3011                 int i, num_failed;
3012
3013                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3014                 if (ret != 0) {
3015                         return ret;
3016                 }
3017
3018                 num_failed = 0;
3019                 for (i=0; i<count; i++) {
3020                         ret = ctdb_sys_send_tcp(&clist[i].src,
3021                                                 &clist[i].dst,
3022                                                 0, 0, 0);
3023                         if (ret != 0) {
3024                                 num_failed += 1;
3025                         }
3026                 }
3027
3028                 TALLOC_FREE(clist);
3029
3030                 if (num_failed > 0) {
3031                         fprintf(stderr, "Failed to send %d tickles\n",
3032                                 num_failed);
3033                         return 1;
3034                 }
3035
3036                 return 0;
3037         }
3038
3039
3040         if (! parse_ip_port(argv[0], &src)) {
3041                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3042                 return 1;
3043         }
3044
3045         if (! parse_ip_port(argv[1], &dst)) {
3046                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3047                 return 1;
3048         }
3049
3050         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3051         if (ret != 0) {
3052                 fprintf(stderr, "Failed to send tickle ack\n");
3053                 return ret;
3054         }
3055
3056         return 0;
3057 }
3058
3059 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3060                               int argc, const char **argv)
3061 {
3062         ctdb_sock_addr addr;
3063         struct ctdb_tickle_list *tickles;
3064         unsigned port = 0;
3065         int ret, i;
3066
3067         if (argc < 1 || argc > 2) {
3068                 usage("gettickles");
3069         }
3070
3071         if (argc == 2) {
3072                 port = strtoul(argv[1], NULL, 10);
3073         }
3074
3075         if (! parse_ip(argv[0], NULL, port, &addr)) {
3076                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3077                 return 1;
3078         }
3079
3080         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3081                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3082                                             &tickles);
3083         if (ret != 0) {
3084                 fprintf(stderr, "Failed to get list of connections\n");
3085                 return ret;
3086         }
3087
3088         if (options.machinereadable) {
3089                 printf("%s%s%s%s%s%s%s%s%s\n",
3090                        options.sep,
3091                        "Source IP", options.sep,
3092                        "Port", options.sep,
3093                        "Destiation IP", options.sep,
3094                        "Port", options.sep);
3095                 for (i=0; i<tickles->num; i++) {
3096                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3097                                ctdb_sock_addr_to_string(
3098                                        mem_ctx, &tickles->conn[i].src),
3099                                options.sep,
3100                                ntohs(tickles->conn[i].src.ip.sin_port),
3101                                options.sep,
3102                                ctdb_sock_addr_to_string(
3103                                        mem_ctx, &tickles->conn[i].dst),
3104                                options.sep,
3105                                ntohs(tickles->conn[i].dst.ip.sin_port),
3106                                options.sep);
3107                 }
3108         } else {
3109                 printf("Connections for IP: %s\n",
3110                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3111                 printf("Num connections: %u\n", tickles->num);
3112                 for (i=0; i<tickles->num; i++) {
3113                         printf("SRC: %s:%u   DST: %s:%u\n",
3114                                ctdb_sock_addr_to_string(
3115                                        mem_ctx, &tickles->conn[i].src),
3116                                ntohs(tickles->conn[i].src.ip.sin_port),
3117                                ctdb_sock_addr_to_string(
3118                                        mem_ctx, &tickles->conn[i].dst),
3119                                ntohs(tickles->conn[i].dst.ip.sin_port));
3120                 }
3121         }
3122
3123         talloc_free(tickles);
3124         return 0;
3125 }
3126
3127 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3128                                    struct ctdb_connection *conn);
3129
3130 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3131
3132 struct process_clist_state {
3133         struct ctdb_connection *clist;
3134         int count;
3135         int num_failed, num_total;
3136         clist_reply_func reply_func;
3137 };
3138
3139 static void process_clist_done(struct tevent_req *subreq);
3140
3141 static struct tevent_req *process_clist_send(
3142                                         TALLOC_CTX *mem_ctx,
3143                                         struct ctdb_context *ctdb,
3144                                         struct ctdb_connection *clist,
3145                                         int count,
3146                                         clist_request_func request_func,
3147                                         clist_reply_func reply_func)
3148 {
3149         struct tevent_req *req, *subreq;
3150         struct process_clist_state *state;
3151         struct ctdb_req_control request;
3152         int i;
3153
3154         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3155         if (req == NULL) {
3156                 return NULL;
3157         }
3158
3159         state->clist = clist;
3160         state->count = count;
3161         state->reply_func = reply_func;
3162
3163         for (i=0; i<count; i++) {
3164                 request_func(&request, &clist[i]);
3165                 subreq = ctdb_client_control_send(state, ctdb->ev,
3166                                                   ctdb->client, ctdb->cmd_pnn,
3167                                                   TIMEOUT(), &request);
3168                 if (tevent_req_nomem(subreq, req)) {
3169                         return tevent_req_post(req, ctdb->ev);
3170                 }
3171                 tevent_req_set_callback(subreq, process_clist_done, req);
3172         }
3173
3174         return req;
3175 }
3176
3177 static void process_clist_done(struct tevent_req *subreq)
3178 {
3179         struct tevent_req *req = tevent_req_callback_data(
3180                 subreq, struct tevent_req);
3181         struct process_clist_state *state = tevent_req_data(
3182                 req, struct process_clist_state);
3183         struct ctdb_reply_control *reply;
3184         int ret;
3185         bool status;
3186
3187         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3188         TALLOC_FREE(subreq);
3189         if (! status) {
3190                 state->num_failed += 1;
3191                 goto done;
3192         }
3193
3194         ret = state->reply_func(reply);
3195         if (ret != 0) {
3196                 state->num_failed += 1;
3197                 goto done;
3198         }
3199
3200 done:
3201         state->num_total += 1;
3202         if (state->num_total == state->count) {
3203                 tevent_req_done(req);
3204         }
3205 }
3206
3207 static int process_clist_recv(struct tevent_req *req)
3208 {
3209         struct process_clist_state *state = tevent_req_data(
3210                 req, struct process_clist_state);
3211
3212         return state->num_failed;
3213 }
3214
3215 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3216                              int argc, const char **argv)
3217 {
3218         struct ctdb_connection conn;
3219         int ret;
3220
3221         if (argc != 0 && argc != 2) {
3222                 usage("addtickle");
3223         }
3224
3225         if (argc == 0) {
3226                 struct ctdb_connection *clist;
3227                 struct tevent_req *req;
3228                 int count;
3229
3230                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3231                 if (ret != 0) {
3232                         return ret;
3233                 }
3234                 if (count == 0) {
3235                         return 0;
3236                 }
3237
3238                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3239                                  ctdb_req_control_tcp_add_delayed_update,
3240                                  ctdb_reply_control_tcp_add_delayed_update);
3241                 if (req == NULL) {
3242                         talloc_free(clist);
3243                         return ENOMEM;
3244                 }
3245
3246                 tevent_req_poll(req, ctdb->ev);
3247                 talloc_free(clist);
3248
3249                 ret = process_clist_recv(req);
3250                 if (ret != 0) {
3251                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3252                         return 1;
3253                 }
3254
3255                 return 0;
3256         }
3257
3258         if (! parse_ip_port(argv[0], &conn.src)) {
3259                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3260                 return 1;
3261         }
3262         if (! parse_ip_port(argv[1], &conn.dst)) {
3263                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3264                 return 1;
3265         }
3266
3267         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3268                                                ctdb->client, ctdb->cmd_pnn,
3269                                                TIMEOUT(), &conn);
3270         if (ret != 0) {
3271                 fprintf(stderr, "Failed to register connection\n");
3272                 return ret;
3273         }
3274
3275         return 0;
3276 }
3277
3278 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3279                              int argc, const char **argv)
3280 {
3281         struct ctdb_connection conn;
3282         int ret;
3283
3284         if (argc != 0 && argc != 2) {
3285                 usage("deltickle");
3286         }
3287
3288         if (argc == 0) {
3289                 struct ctdb_connection *clist;
3290                 struct tevent_req *req;
3291                 int count;
3292
3293                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3294                 if (ret != 0) {
3295                         return ret;
3296                 }
3297                 if (count == 0) {
3298                         return 0;
3299                 }
3300
3301                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3302                                          ctdb_req_control_tcp_remove,
3303                                          ctdb_reply_control_tcp_remove);
3304                 if (req == NULL) {
3305                         talloc_free(clist);
3306                         return ENOMEM;
3307                 }
3308
3309                 tevent_req_poll(req, ctdb->ev);
3310                 talloc_free(clist);
3311
3312                 ret = process_clist_recv(req);
3313                 if (ret != 0) {
3314                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3315                         return 1;
3316                 }
3317
3318                 return 0;
3319         }
3320
3321         if (! parse_ip_port(argv[0], &conn.src)) {
3322                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3323                 return 1;
3324         }
3325         if (! parse_ip_port(argv[1], &conn.dst)) {
3326                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3327                 return 1;
3328         }
3329
3330         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3331                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3332         if (ret != 0) {
3333                 fprintf(stderr, "Failed to unregister connection\n");
3334                 return ret;
3335         }
3336
3337         return 0;
3338 }
3339
3340 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3341                                 int argc, const char **argv)
3342 {
3343         uint64_t *srvid;
3344         uint8_t *result;
3345         int ret, i;
3346
3347         if (argc == 0) {
3348                 usage("check_srvids");
3349         }
3350
3351         srvid = talloc_array(mem_ctx, uint64_t, argc);
3352         if (srvid == NULL) {
3353                 fprintf(stderr, "Memory allocation error\n");
3354                 return 1;
3355         }
3356
3357         for (i=0; i<argc; i++) {
3358                 srvid[i] = strtoull(argv[i], NULL, 0);
3359         }
3360
3361         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3362                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3363                                      &result);
3364         if (ret != 0) {
3365                 fprintf(stderr, "Failed to check srvids on node %u\n",
3366                         ctdb->cmd_pnn);
3367                 return ret;
3368         }
3369
3370         for (i=0; i<argc; i++) {
3371                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3372                        (result[i] ? "exists" : "does not exist"));
3373         }
3374
3375         return 0;
3376 }
3377
3378 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3379                              int argc, const char **argv)
3380 {
3381         struct ctdb_node_map *nodemap;
3382         int i;
3383
3384         if (argc != 0) {
3385                 usage("listnodes");
3386         }
3387
3388         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3389         if (nodemap == NULL) {
3390                 return 1;
3391         }
3392
3393         for (i=0; i<nodemap->num; i++) {
3394                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3395                         continue;
3396                 }
3397
3398                 if (options.machinereadable) {
3399                         printf("%s%u%s%s%s\n", options.sep,
3400                                nodemap->node[i].pnn, options.sep,
3401                                ctdb_sock_addr_to_string(
3402                                         mem_ctx, &nodemap->node[i].addr),
3403                                options.sep);
3404                 } else {
3405                         printf("%s\n",
3406                                ctdb_sock_addr_to_string(
3407                                         mem_ctx, &nodemap->node[i].addr));
3408                 }
3409         }
3410
3411         return 0;
3412 }
3413
3414 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3415                               struct ctdb_node_map *nodemap2)
3416 {
3417         int i;
3418
3419         if (nodemap1->num != nodemap2->num) {
3420                 return false;
3421         }
3422
3423         for (i=0; i<nodemap1->num; i++) {
3424                 struct ctdb_node_and_flags *n1, *n2;
3425
3426                 n1 = &nodemap1->node[i];
3427                 n2 = &nodemap2->node[i];
3428
3429                 if ((n1->pnn != n2->pnn) ||
3430                     (n1->flags != n2->flags) ||
3431                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3432                         return false;
3433                 }
3434         }
3435
3436         return true;
3437 }
3438
3439 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3440                                    struct ctdb_node_map *nm,
3441                                    struct ctdb_node_map *fnm,
3442                                    bool *reload)
3443 {
3444         int i;
3445         bool check_failed = false;
3446
3447         *reload = false;
3448
3449         for (i=0; i<nm->num; i++) {
3450                 if (i >= fnm->num) {
3451                         fprintf(stderr,
3452                                 "Node %u (%s) missing from nodes file\n",
3453                                 nm->node[i].pnn,
3454                                 ctdb_sock_addr_to_string(
3455                                         mem_ctx, &nm->node[i].addr));
3456                         check_failed = true;
3457                         continue;
3458                 }
3459                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3460                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3461                         /* Node remains deleted */
3462                         continue;
3463                 }
3464
3465                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3466                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3467                         /* Node not newly nor previously deleted */
3468                         if (! ctdb_same_ip(&nm->node[i].addr,
3469                                            &fnm->node[i].addr)) {
3470                                 fprintf(stderr,
3471                                         "Node %u has changed IP address"
3472                                         " (was %s, now %s)\n",
3473                                         nm->node[i].pnn,
3474                                         ctdb_sock_addr_to_string(
3475                                                 mem_ctx, &nm->node[i].addr),
3476                                         ctdb_sock_addr_to_string(
3477                                                 mem_ctx, &fnm->node[i].addr));
3478                                 check_failed = true;
3479                         } else {
3480                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3481                                         fprintf(stderr,
3482                                                 "WARNING: Node %u is disconnected."
3483                                                 " You MUST fix this node manually!\n",
3484                                                 nm->node[i].pnn);
3485                                 }
3486                         }
3487                         continue;
3488                 }
3489
3490                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3491                         /* Node is being deleted */
3492                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3493                         *reload = true;
3494                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3495                                 fprintf(stderr,
3496                                         "ERROR: Node %u is still connected\n",
3497                                         nm->node[i].pnn);
3498                                 check_failed = true;
3499                         }
3500                         continue;
3501                 }
3502
3503                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3504                         /* Node was previously deleted */
3505                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3506                         *reload = true;
3507                 }
3508         }
3509
3510         if (check_failed) {
3511                 fprintf(stderr,
3512                         "ERROR: Nodes will not be reloaded due to previous error\n");
3513                 return 1;
3514         }
3515
3516         /* Leftover nodes in file are NEW */
3517         for (; i < fnm->num; i++) {
3518                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3519                 *reload = true;
3520         }
3521
3522         return 0;
3523 }
3524
3525 struct disable_recoveries_state {
3526         uint32_t *pnn_list;
3527         int node_count;
3528         bool *reply;
3529         int status;
3530         bool done;
3531 };
3532
3533 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3534                                        void *private_data)
3535 {
3536         struct disable_recoveries_state *state =
3537                 (struct disable_recoveries_state *)private_data;
3538         int ret, i;
3539
3540         if (data.dsize != sizeof(int)) {
3541                 /* Ignore packet */
3542                 return;
3543         }
3544
3545         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3546         ret = *(int *)data.dptr;
3547         if (ret < 0) {
3548                 state->status = ret;
3549                 state->done = true;
3550                 return;
3551         }
3552         for (i=0; i<state->node_count; i++) {
3553                 if (state->pnn_list[i] == ret) {
3554                         state->reply[i] = true;
3555                         break;
3556                 }
3557         }
3558
3559         state->done = true;
3560         for (i=0; i<state->node_count; i++) {
3561                 if (! state->reply[i]) {
3562                         state->done = false;
3563                         break;
3564                 }
3565         }
3566 }
3567
3568 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3569                               uint32_t timeout, uint32_t *pnn_list, int count)
3570 {
3571         struct ctdb_disable_message disable = { 0 };
3572         struct disable_recoveries_state state;
3573         int ret, i;
3574
3575         disable.pnn = ctdb->pnn;
3576         disable.srvid = next_srvid(ctdb);
3577         disable.timeout = timeout;
3578
3579         state.pnn_list = pnn_list;
3580         state.node_count = count;
3581         state.done = false;
3582         state.status = 0;
3583         state.reply = talloc_zero_array(mem_ctx, bool, count);
3584         if (state.reply == NULL) {
3585                 return ENOMEM;
3586         }
3587
3588         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3589                                               disable.srvid,
3590                                               disable_recoveries_handler,
3591                                               &state);
3592         if (ret != 0) {
3593                 return ret;
3594         }
3595
3596         for (i=0; i<count; i++) {
3597                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3598                                                       ctdb->client,
3599                                                       pnn_list[i],
3600                                                       &disable);
3601                 if (ret != 0) {
3602                         goto fail;
3603                 }
3604         }
3605
3606         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3607         if (ret == ETIME) {
3608                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3609         } else {
3610                 ret = (state.status >= 0 ? 0 : 1);
3611         }
3612
3613 fail:
3614         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3615                                            disable.srvid, &state);
3616         return ret;
3617 }
3618
3619 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3620                                int argc, const char **argv)
3621 {
3622         struct ctdb_node_map *nodemap = NULL;
3623         struct ctdb_node_map *file_nodemap;
3624         struct ctdb_node_map *remote_nodemap;
3625         struct ctdb_req_control request;
3626         struct ctdb_reply_control **reply;
3627         bool reload;
3628         int ret, i;
3629         uint32_t *pnn_list;
3630         int count;
3631
3632         nodemap = get_nodemap(ctdb, false);
3633         if (nodemap == NULL) {
3634                 return 1;
3635         }
3636
3637         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3638         if (file_nodemap == NULL) {
3639                 return 1;
3640         }
3641
3642         for (i=0; i<nodemap->num; i++) {
3643                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3644                         continue;
3645                 }
3646
3647                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3648                                                nodemap->node[i].pnn, TIMEOUT(),
3649                                                &remote_nodemap);
3650                 if (ret != 0) {
3651                         fprintf(stderr,
3652                                 "ERROR: Failed to get nodes file from node %u\n",
3653                                 nodemap->node[i].pnn);
3654                         return ret;
3655                 }
3656
3657                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3658                         fprintf(stderr,
3659                                 "ERROR: Nodes file on node %u differs"
3660                                  " from current node (%u)\n",
3661                                  nodemap->node[i].pnn, ctdb->pnn);
3662                         return 1;
3663                 }
3664         }
3665
3666         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3667         if (ret != 0) {
3668                 return ret;
3669         }
3670
3671         if (! reload) {
3672                 fprintf(stderr, "No change in nodes file,"
3673                                 " skipping unnecessary reload\n");
3674                 return 0;
3675         }
3676
3677         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3678                                         mem_ctx, &pnn_list);
3679         if (count <= 0) {
3680                 fprintf(stderr, "Memory allocation error\n");
3681                 return 1;
3682         }
3683
3684         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3685                                  pnn_list, count);
3686         if (ret != 0) {
3687                 fprintf(stderr, "Failed to disable recoveries\n");
3688                 return ret;
3689         }
3690
3691         ctdb_req_control_reload_nodes_file(&request);
3692         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3693                                         pnn_list, count, TIMEOUT(),
3694                                         &request, NULL, &reply);
3695         if (ret != 0) {
3696                 bool failed = false;
3697
3698                 for (i=0; i<count; i++) {
3699                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3700                         if (ret != 0) {
3701                                 fprintf(stderr,
3702                                         "Node %u failed to reload nodes\n",
3703                                         pnn_list[i]);
3704                                 failed = true;
3705                         }
3706                 }
3707                 if (failed) {
3708                         fprintf(stderr,
3709                                 "You MUST fix failed nodes manually!\n");
3710                 }
3711         }
3712
3713         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3714         if (ret != 0) {
3715                 fprintf(stderr, "Failed to enable recoveries\n");
3716                 return ret;
3717         }
3718
3719         return 0;
3720 }
3721
3722 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3723                   ctdb_sock_addr *addr, uint32_t pnn)
3724 {
3725         struct ctdb_public_ip_list *pubip_list;
3726         struct ctdb_public_ip pubip;
3727         struct ctdb_node_map *nodemap;
3728         struct ctdb_req_control request;
3729         uint32_t *pnn_list;
3730         int ret, i, count;
3731
3732         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3733                                             CTDB_BROADCAST_CONNECTED,
3734                                             2*options.timelimit);
3735         if (ret != 0) {
3736                 fprintf(stderr, "Failed to disable IP check\n");
3737                 return ret;
3738         }
3739
3740         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3741                                        pnn, TIMEOUT(), &pubip_list);
3742         if (ret != 0) {
3743                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3744                         pnn);
3745                 return ret;
3746         }
3747
3748         for (i=0; i<pubip_list->num; i++) {
3749                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3750                         break;
3751                 }
3752         }
3753
3754         if (i == pubip_list->num) {
3755                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3756                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3757                 return 1;
3758         }
3759
3760         nodemap = get_nodemap(ctdb, false);
3761         if (nodemap == NULL) {
3762                 return 1;
3763         }
3764
3765         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3766         if (count <= 0) {
3767                 fprintf(stderr, "Memory allocation error\n");
3768                 return 1;
3769         }
3770
3771         pubip.pnn = pnn;
3772         pubip.addr = *addr;
3773         ctdb_req_control_release_ip(&request, &pubip);
3774
3775         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3776                                         pnn_list, count, TIMEOUT(),
3777                                         &request, NULL, NULL);
3778         if (ret != 0) {
3779                 fprintf(stderr, "Failed to release IP on nodes\n");
3780                 return ret;
3781         }
3782
3783         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3784                                     pnn, TIMEOUT(), &pubip);
3785         if (ret != 0) {
3786                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3787                 return ret;
3788         }
3789
3790         return 0;
3791 }
3792
3793 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3794                           int argc, const char **argv)
3795 {
3796         ctdb_sock_addr addr;
3797         uint32_t pnn;
3798         int ret, retries = 0;
3799
3800         if (argc != 2) {
3801                 usage("moveip");
3802         }
3803
3804         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3805                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3806                 return 1;
3807         }
3808
3809         pnn = strtoul(argv[1], NULL, 10);
3810         if (pnn == CTDB_UNKNOWN_PNN) {
3811                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3812                 return 1;
3813         }
3814
3815         while (retries < 5) {
3816                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3817                 if (ret == 0) {
3818                         break;
3819                 }
3820
3821                 sleep(3);
3822                 retries++;
3823         }
3824
3825         if (ret != 0) {
3826                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3827                         argv[0], pnn);
3828                 return ret;
3829         }
3830
3831         return 0;
3832 }
3833
3834 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3835                          uint32_t pnn)
3836 {
3837         int ret;
3838
3839         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3840                                           CTDB_BROADCAST_CONNECTED, pnn);
3841         if (ret != 0) {
3842                 fprintf(stderr,
3843                         "Failed to ask recovery master to distribute IPs\n");
3844                 return ret;
3845         }
3846
3847         return 0;
3848 }
3849
3850 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3851                          int argc, const char **argv)
3852 {
3853         ctdb_sock_addr addr;
3854         struct ctdb_public_ip_list *pubip_list;
3855         struct ctdb_addr_info addr_info;
3856         unsigned int mask;
3857         int ret, i, retries = 0;
3858
3859         if (argc != 2) {
3860                 usage("addip");
3861         }
3862
3863         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3864                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3865                 return 1;
3866         }
3867
3868         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3869                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3870         if (ret != 0) {
3871                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3872                         ctdb->cmd_pnn);
3873                 return 1;
3874         }
3875
3876         for (i=0; i<pubip_list->num; i++) {
3877                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3878                         fprintf(stderr, "Node already knows about IP %s\n",
3879                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3880                         return 0;
3881                 }
3882         }
3883
3884         addr_info.addr = addr;
3885         addr_info.mask = mask;
3886         addr_info.iface = argv[1];
3887
3888         while (retries < 5) {
3889                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3890                                               ctdb->cmd_pnn, TIMEOUT(),
3891                                               &addr_info);
3892                 if (ret == 0) {
3893                         break;
3894                 }
3895
3896                 sleep(3);
3897                 retries++;
3898         }
3899
3900         if (ret != 0) {
3901                 fprintf(stderr, "Failed to add public IP to node %u."
3902                                 " Giving up\n", ctdb->cmd_pnn);
3903                 return ret;
3904         }
3905
3906         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3907         if (ret != 0) {
3908                 return ret;
3909         }
3910
3911         return 0;
3912 }
3913
3914 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3915                          int argc, const char **argv)
3916 {
3917         ctdb_sock_addr addr;
3918         struct ctdb_public_ip_list *pubip_list;
3919         struct ctdb_addr_info addr_info;
3920         int ret, i;
3921
3922         if (argc != 1) {
3923                 usage("delip");
3924         }
3925
3926         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3927                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3928                 return 1;
3929         }
3930
3931         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3932                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3933         if (ret != 0) {
3934                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3935                         ctdb->cmd_pnn);
3936                 return 1;
3937         }
3938
3939         for (i=0; i<pubip_list->num; i++) {
3940                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3941                         break;
3942                 }
3943         }
3944
3945         if (i == pubip_list->num) {
3946                 fprintf(stderr, "Node does not know about IP address %s\n",
3947                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3948                 return 0;
3949         }
3950
3951         addr_info.addr = addr;
3952         addr_info.mask = 0;
3953         addr_info.iface = NULL;
3954
3955         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3956                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3957         if (ret != 0) {
3958                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3959                         ctdb->cmd_pnn);
3960                 return ret;
3961         }
3962
3963         return 0;
3964 }
3965
3966 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3967                                int argc, const char **argv)
3968 {
3969         int ret;
3970
3971         if (argc != 1) {
3972                 usage("eventscript");
3973         }
3974
3975         if (strcmp(argv[0], "monitor") != 0) {
3976                 fprintf(stderr, "Only monitor event can be run\n");
3977                 return 1;
3978         }
3979
3980         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
3981                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
3982         if (ret != 0) {
3983                 fprintf(stderr, "Failed to run monitor event on node %u\n",
3984                         ctdb->cmd_pnn);
3985                 return ret;
3986         }
3987
3988         return 0;
3989 }
3990
3991 #define DB_VERSION      3
3992 #define MAX_DB_NAME     64
3993 #define MAX_REC_BUFFER_SIZE     (100*1000)
3994
3995 struct db_header {
3996         unsigned long version;
3997         time_t timestamp;
3998         unsigned long flags;
3999         unsigned long nbuf;
4000         unsigned long nrec;
4001         char name[MAX_DB_NAME];
4002 };
4003
4004 struct backup_state {
4005         TALLOC_CTX *mem_ctx;
4006         struct ctdb_rec_buffer *recbuf;
4007         uint32_t db_id;
4008         int fd;
4009         unsigned int nbuf, nrec;
4010 };
4011
4012 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4013                           TDB_DATA key, TDB_DATA data, void *private_data)
4014 {
4015         struct backup_state *state = (struct backup_state *)private_data;
4016         size_t len;
4017         int ret;
4018
4019         if (state->recbuf == NULL) {
4020                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4021                                                      state->db_id);
4022                 if (state->recbuf == NULL) {
4023                         return ENOMEM;
4024                 }
4025         }
4026
4027         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4028                                   header, key, data);
4029         if (ret != 0) {
4030                 return ret;
4031         }
4032
4033         len = ctdb_rec_buffer_len(state->recbuf);
4034         if (len < MAX_REC_BUFFER_SIZE) {
4035                 return 0;
4036         }
4037
4038         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4039         if (ret != 0) {
4040                 fprintf(stderr, "Failed to write records to backup file\n");
4041                 return ret;
4042         }
4043
4044         state->nbuf += 1;
4045         state->nrec += state->recbuf->count;
4046         TALLOC_FREE(state->recbuf);
4047
4048         return 0;
4049 }
4050
4051 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4052                             int argc, const char **argv)
4053 {
4054         const char *db_name;
4055         struct ctdb_db_context *db;
4056         uint32_t db_id;
4057         uint8_t db_flags;
4058         struct backup_state state;
4059         struct db_header db_hdr;
4060         int fd, ret;
4061
4062         if (argc != 2) {
4063                 usage("backupdb");
4064         }
4065
4066         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4067                 return 1;
4068         }
4069
4070         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4071                           db_flags, &db);
4072         if (ret != 0) {
4073                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4074                 return ret;
4075         }
4076
4077         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4078         if (fd == -1) {
4079                 ret = errno;
4080                 fprintf(stderr, "Failed to open file %s for writing\n",
4081                         argv[1]);
4082                 return ret;
4083         }
4084
4085         /* Write empty header first */
4086         ZERO_STRUCT(db_hdr);
4087         ret = write(fd, &db_hdr, sizeof(struct db_header));
4088         if (ret == -1) {
4089                 ret = errno;
4090                 close(fd);
4091                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4092                 return ret;
4093         }
4094
4095         state.mem_ctx = mem_ctx;
4096         state.recbuf = NULL;
4097         state.fd = fd;
4098         state.nbuf = 0;
4099         state.nrec = 0;
4100
4101         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4102         if (ret != 0) {
4103                 fprintf(stderr, "Failed to collect records from DB %s\n",
4104                         db_name);
4105                 close(fd);
4106                 return ret;
4107         }
4108
4109         if (state.recbuf != NULL) {
4110                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4111                 if (ret != 0) {
4112                         fprintf(stderr,
4113                                 "Failed to write records to backup file\n");
4114                         close(fd);
4115                         return ret;
4116                 }
4117
4118                 state.nbuf += 1;
4119                 state.nrec += state.recbuf->count;
4120                 TALLOC_FREE(state.recbuf);
4121         }
4122
4123         db_hdr.version = DB_VERSION;
4124         db_hdr.timestamp = time(NULL);
4125         db_hdr.flags = db_flags;
4126         db_hdr.nbuf = state.nbuf;
4127         db_hdr.nrec = state.nrec;
4128         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4129
4130         lseek(fd, 0, SEEK_SET);
4131         ret = write(fd, &db_hdr, sizeof(struct db_header));
4132         if (ret == -1) {
4133                 ret = errno;
4134                 close(fd);
4135                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4136                 return ret;
4137         }
4138
4139         close(fd);
4140         printf("Database backed up to %s\n", argv[1]);
4141         return 0;
4142 }
4143
4144 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4145                              int argc, const char **argv)
4146 {
4147         const char *db_name = NULL;
4148         struct ctdb_db_context *db;
4149         struct db_header db_hdr;
4150         struct ctdb_node_map *nodemap;
4151         struct ctdb_req_control request;
4152         struct ctdb_reply_control **reply;
4153         struct ctdb_transdb wipedb;
4154         struct ctdb_pulldb_ext pulldb;
4155         struct ctdb_rec_buffer *recbuf;
4156         uint32_t generation;
4157         uint32_t *pnn_list;
4158         char timebuf[128];
4159         int fd, i;
4160         int count, ret;
4161
4162         if (argc < 1 || argc > 2) {
4163                 usage("restoredb");
4164         }
4165
4166         fd = open(argv[0], O_RDONLY, 0600);
4167         if (fd == -1) {
4168                 ret = errno;
4169                 fprintf(stderr, "Failed to open file %s for reading\n",
4170                         argv[0]);
4171                 return ret;
4172         }
4173
4174         if (argc == 2) {
4175                 db_name = argv[1];
4176         }
4177
4178         ret = read(fd, &db_hdr, sizeof(struct db_header));
4179         if (ret == -1) {
4180                 ret = errno;
4181                 close(fd);
4182                 fprintf(stderr, "Failed to read db header from file %s\n",
4183                         argv[0]);
4184                 return ret;
4185         }
4186         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4187
4188         if (db_hdr.version != DB_VERSION) {
4189                 fprintf(stderr,
4190                         "Wrong version of backup file, expected %u, got %lu\n",
4191                         DB_VERSION, db_hdr.version);
4192                 close(fd);
4193                 return EINVAL;
4194         }
4195
4196         if (db_name == NULL) {
4197                 db_name = db_hdr.name;
4198         }
4199
4200         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4201                  localtime(&db_hdr.timestamp));
4202         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4203
4204         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4205                           db_hdr.flags, &db);
4206         if (ret != 0) {
4207                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4208                 close(fd);
4209                 return ret;
4210         }
4211
4212         nodemap = get_nodemap(ctdb, false);
4213         if (nodemap == NULL) {
4214                 fprintf(stderr, "Failed to get nodemap\n");
4215                 close(fd);
4216                 return ENOMEM;
4217         }
4218
4219         ret = get_generation(mem_ctx, ctdb, &generation);
4220         if (ret != 0) {
4221                 fprintf(stderr, "Failed to get current generation\n");
4222                 close(fd);
4223                 return ret;
4224         }
4225
4226         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4227                                      &pnn_list);
4228         if (count <= 0) {
4229                 close(fd);
4230                 return ENOMEM;
4231         }
4232
4233         wipedb.db_id = ctdb_db_id(db);
4234         wipedb.tid = generation;
4235
4236         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4237         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4238                                         ctdb->client, pnn_list, count,
4239                                         TIMEOUT(), &request, NULL, NULL);
4240         if (ret != 0) {
4241                 goto failed;
4242         }
4243
4244
4245         ctdb_req_control_db_transaction_start(&request, &wipedb);
4246         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4247                                         pnn_list, count, TIMEOUT(),
4248                                         &request, NULL, NULL);
4249         if (ret != 0) {
4250                 goto failed;
4251         }
4252
4253         ctdb_req_control_wipe_database(&request, &wipedb);
4254         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4255                                         pnn_list, count, TIMEOUT(),
4256                                         &request, NULL, NULL);
4257         if (ret != 0) {
4258                 goto failed;
4259         }
4260
4261         pulldb.db_id = ctdb_db_id(db);
4262         pulldb.lmaster = 0;
4263         pulldb.srvid = SRVID_CTDB_PUSHDB;
4264
4265         ctdb_req_control_db_push_start(&request, &pulldb);
4266         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4267                                         pnn_list, count, TIMEOUT(),
4268                                         &request, NULL, NULL);
4269         if (ret != 0) {
4270                 goto failed;
4271         }
4272
4273         for (i=0; i<db_hdr.nbuf; i++) {
4274                 struct ctdb_req_message message;
4275                 TDB_DATA data;
4276
4277                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4278                 if (ret != 0) {
4279                         goto failed;
4280                 }
4281
4282                 data.dsize = ctdb_rec_buffer_len(recbuf);
4283                 data.dptr = talloc_size(mem_ctx, data.dsize);
4284                 if (data.dptr == NULL) {
4285                         goto failed;
4286                 }
4287
4288                 ctdb_rec_buffer_push(recbuf, data.dptr);
4289
4290                 message.srvid = pulldb.srvid;
4291                 message.data.data = data;
4292
4293                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4294                                                 ctdb->client,
4295                                                 pnn_list, count,
4296                                                 &message, NULL);
4297                 if (ret != 0) {
4298                         goto failed;
4299                 }
4300
4301                 talloc_free(recbuf);
4302                 talloc_free(data.dptr);
4303         }
4304
4305         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4306         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4307                                         pnn_list, count, TIMEOUT(),
4308                                         &request, NULL, &reply);
4309         if (ret != 0) {
4310                 goto failed;
4311         }
4312
4313         for (i=0; i<count; i++) {
4314                 uint32_t num_records;
4315
4316                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4317                                                          &num_records);
4318                 if (ret != 0) {
4319                         fprintf(stderr, "Invalid response from node %u\n",
4320                                 pnn_list[i]);
4321                         goto failed;
4322                 }
4323
4324                 if (num_records != db_hdr.nrec) {
4325                         fprintf(stderr, "Node %u received %u of %lu records\n",
4326                                 pnn_list[i], num_records, db_hdr.nrec);
4327                         goto failed;
4328                 }
4329         }
4330
4331         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4332         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4333                                         pnn_list, count, TIMEOUT(),
4334                                         &request, NULL, NULL);
4335         if (ret != 0) {
4336                 goto failed;
4337         }
4338
4339         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4340         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4341                                         pnn_list, count, TIMEOUT(),
4342                                         &request, NULL, NULL);
4343         if (ret != 0) {
4344                 goto failed;
4345         }
4346
4347         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4348         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4349                                         ctdb->client, pnn_list, count,
4350                                         TIMEOUT(), &request, NULL, NULL);
4351         if (ret != 0) {
4352                 goto failed;
4353         }
4354
4355         printf("Database %s restored\n", db_name);
4356         return 0;
4357
4358
4359 failed:
4360         close(fd);
4361         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4362                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4363         return ret;
4364 }
4365
4366 struct dumpdbbackup_state {
4367         ctdb_rec_parser_func_t parser;
4368         struct dump_record_state sub_state;
4369 };
4370
4371 static int dumpdbbackup_handler(uint32_t reqid,
4372                                 struct ctdb_ltdb_header *header,
4373                                 TDB_DATA key, TDB_DATA data,
4374                                 void *private_data)
4375 {
4376         struct dumpdbbackup_state *state =
4377                 (struct dumpdbbackup_state *)private_data;
4378         struct ctdb_ltdb_header hdr;
4379         int ret;
4380
4381         ret = ctdb_ltdb_header_extract(&data, &hdr);
4382         if (ret != 0) {
4383                 return ret;
4384         }
4385
4386         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4387 }
4388
4389 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4390                                 int argc, const char **argv)
4391 {
4392         struct db_header db_hdr;
4393         char timebuf[128];
4394         struct dumpdbbackup_state state;
4395         int fd, ret, i;
4396
4397         if (argc != 1) {
4398                 usage("dumpbackup");
4399         }
4400
4401         fd = open(argv[0], O_RDONLY, 0600);
4402         if (fd == -1) {
4403                 ret = errno;
4404                 fprintf(stderr, "Failed to open file %s for reading\n",
4405                         argv[0]);
4406                 return ret;
4407         }
4408
4409         ret = read(fd, &db_hdr, sizeof(struct db_header));
4410         if (ret == -1) {
4411                 ret = errno;
4412                 close(fd);
4413                 fprintf(stderr, "Failed to read db header from file %s\n",
4414                         argv[0]);
4415                 return ret;
4416         }
4417         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4418
4419         if (db_hdr.version != DB_VERSION) {
4420                 fprintf(stderr,
4421                         "Wrong version of backup file, expected %u, got %lu\n",
4422                         DB_VERSION, db_hdr.version);
4423                 close(fd);
4424                 return EINVAL;
4425         }
4426
4427         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4428                  localtime(&db_hdr.timestamp));
4429         printf("Dumping database %s from backup @ %s\n",
4430                db_hdr.name, timebuf);
4431
4432         state.parser = dump_record;
4433         state.sub_state.count = 0;
4434
4435         for (i=0; i<db_hdr.nbuf; i++) {
4436                 struct ctdb_rec_buffer *recbuf;
4437
4438                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4439                 if (ret != 0) {
4440                         fprintf(stderr, "Failed to read records\n");
4441                         close(fd);
4442                         return ret;
4443                 }
4444
4445                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4446                                                &state);
4447                 if (ret != 0) {
4448                         fprintf(stderr, "Failed to dump records\n");
4449                         close(fd);
4450                         return ret;
4451                 }
4452         }
4453
4454         close(fd);
4455         printf("Dumped %u record(s)\n", state.sub_state.count);
4456         return 0;
4457 }
4458
4459 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4460                           int argc, const char **argv)
4461 {
4462         const char *db_name;
4463         struct ctdb_db_context *db;
4464         uint32_t db_id;
4465         uint8_t db_flags;
4466         struct ctdb_node_map *nodemap;
4467         struct ctdb_req_control request;
4468         struct ctdb_transdb wipedb;
4469         uint32_t generation;
4470         uint32_t *pnn_list;
4471         int count, ret;
4472
4473         if (argc != 1) {
4474                 usage("wipedb");
4475         }
4476
4477         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4478                 return 1;
4479         }
4480
4481         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4482                           db_flags, &db);
4483         if (ret != 0) {
4484                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4485                 return ret;
4486         }
4487
4488         nodemap = get_nodemap(ctdb, false);
4489         if (nodemap == NULL) {
4490                 fprintf(stderr, "Failed to get nodemap\n");
4491                 return ENOMEM;
4492         }
4493
4494         ret = get_generation(mem_ctx, ctdb, &generation);
4495         if (ret != 0) {
4496                 fprintf(stderr, "Failed to get current generation\n");
4497                 return ret;
4498         }
4499
4500         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4501                                      &pnn_list);
4502         if (count <= 0) {
4503                 return ENOMEM;
4504         }
4505
4506         ctdb_req_control_db_freeze(&request, db_id);
4507         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4508                                         ctdb->client, pnn_list, count,
4509                                         TIMEOUT(), &request, NULL, NULL);
4510         if (ret != 0) {
4511                 goto failed;
4512         }
4513
4514         wipedb.db_id = db_id;
4515         wipedb.tid = generation;
4516
4517         ctdb_req_control_db_transaction_start(&request, &wipedb);
4518         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4519                                         pnn_list, count, TIMEOUT(),
4520                                         &request, NULL, NULL);
4521         if (ret != 0) {
4522                 goto failed;
4523         }
4524
4525         ctdb_req_control_wipe_database(&request, &wipedb);
4526         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4527                                         pnn_list, count, TIMEOUT(),
4528                                         &request, NULL, NULL);
4529         if (ret != 0) {
4530                 goto failed;
4531         }
4532
4533         ctdb_req_control_db_set_healthy(&request, db_id);
4534         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4535                                         pnn_list, count, TIMEOUT(),
4536                                         &request, NULL, NULL);
4537         if (ret != 0) {
4538                 goto failed;
4539         }
4540
4541         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4542         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4543                                         pnn_list, count, TIMEOUT(),
4544                                         &request, NULL, NULL);
4545         if (ret != 0) {
4546                 goto failed;
4547         }
4548
4549         ctdb_req_control_db_thaw(&request, db_id);
4550         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4551                                         ctdb->client, pnn_list, count,
4552                                         TIMEOUT(), &request, NULL, NULL);
4553         if (ret != 0) {
4554                 goto failed;
4555         }
4556
4557         printf("Database %s wiped\n", db_name);
4558         return 0;
4559
4560
4561 failed:
4562         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4563                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4564         return ret;
4565 }
4566
4567 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4568                              int argc, const char **argv)
4569 {
4570         uint32_t recmaster;
4571         int ret;
4572
4573         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4574                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4575         if (ret != 0) {
4576                 return ret;
4577         }
4578
4579         printf("%u\n", recmaster);
4580         return 0;
4581 }
4582
4583 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4584                                    const char *event_str)
4585 {
4586         int i;
4587         int num_run = 0;
4588
4589         if (slist == NULL) {
4590                 if (! options.machinereadable) {
4591                         printf("%s cycle never run\n", event_str);
4592                 }
4593                 return;
4594         }
4595
4596         for (i=0; i<slist->num_scripts; i++) {
4597                 if (slist->script[i].status != -ENOEXEC) {
4598                         num_run++;
4599                 }
4600         }
4601
4602         if (! options.machinereadable) {
4603                 printf("%d scripts were executed last %s cycle\n",
4604                        num_run, event_str);
4605         }
4606
4607         for (i=0; i<slist->num_scripts; i++) {
4608                 const char *status = NULL;
4609
4610                 switch (slist->script[i].status) {
4611                 case -ETIME:
4612                         status = "TIMEDOUT";
4613                         break;
4614                 case -ENOEXEC:
4615                         status = "DISABLED";
4616                         break;
4617                 case 0:
4618                         status = "OK";
4619                         break;
4620                 default:
4621                         if (slist->script[i].status > 0) {
4622                                 status = "ERROR";
4623                         }
4624                         break;
4625                 }
4626
4627                 if (options.machinereadable) {
4628                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4629                                options.sep,
4630                                event_str, options.sep,
4631                                slist->script[i].name, options.sep,
4632                                slist->script[i].status, options.sep,
4633                                status, options.sep,
4634                                (unsigned long)slist->script[i].start.tv_sec,
4635                                slist->script[i].start.tv_usec, options.sep,
4636                                (unsigned long)slist->script[i].finished.tv_sec,
4637                                slist->script[i].finished.tv_usec, options.sep,
4638                                slist->script[i].output, options.sep);
4639                         continue;
4640                 }
4641
4642                 if (status) {
4643                         printf("%-20s Status:%s    ",
4644                                slist->script[i].name, status);
4645                 } else {
4646                         /* Some other error, eg from stat. */
4647                         printf("%-20s Status:CANNOT RUN (%s)",
4648                                slist->script[i].name,
4649                                strerror(-slist->script[i].status));
4650                 }
4651
4652                 if (slist->script[i].status >= 0) {
4653                         printf("Duration:%.3lf ",
4654                                timeval_delta(&slist->script[i].finished,
4655                                              &slist->script[i].start));
4656                 }
4657                 if (slist->script[i].status != -ENOEXEC) {
4658                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4659                         if (slist->script[i].status != 0) {
4660                                 printf("   OUTPUT:%s\n",
4661                                        slist->script[i].output);
4662                         }
4663                 } else {
4664                         printf("\n");
4665                 }
4666         }
4667 }
4668
4669 static void print_scriptstatus(struct ctdb_script_list **slist,
4670                                int count, const char **event_str)
4671 {
4672         int i;
4673
4674         if (options.machinereadable) {
4675                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4676                        options.sep,
4677                        "Type", options.sep,
4678                        "Name", options.sep,
4679                        "Code", options.sep,
4680                        "Status", options.sep,
4681                        "Start", options.sep,
4682                        "End", options.sep,
4683                        "Error Output", options.sep);
4684         }
4685
4686         for (i=0; i<count; i++) {
4687                 print_scriptstatus_one(slist[i], event_str[i]);
4688         }
4689 }
4690
4691 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4692                                 int argc, const char **argv)
4693 {
4694         struct ctdb_script_list **slist;
4695         const char *event_str;
4696         enum ctdb_event event;
4697         const char *all_events[] = {
4698                 "init", "setup", "startup", "monitor",
4699                 "takeip", "releaseip", "updateip", "ipreallocated" };
4700         bool valid;
4701         int ret, i, j;
4702         int count, start, end, num;
4703
4704         if (argc > 1) {
4705                 usage("scriptstatus");
4706         }
4707
4708         if (argc == 0) {
4709                 event_str = "monitor";
4710         } else {
4711                 event_str = argv[0];
4712         }
4713
4714         valid = false;
4715         end = 0;
4716
4717         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4718                 if (strcmp(event_str, all_events[i]) == 0) {
4719                         valid = true;
4720                         count = 1;
4721                         start = i;
4722                         end = i+1;
4723                         break;
4724                 }
4725         }
4726
4727         if (strcmp(event_str, "all") == 0) {
4728                 valid = true;
4729                 count = ARRAY_SIZE(all_events);
4730                 start = 0;
4731                 end = count-1;
4732         }
4733
4734         if (! valid) {
4735                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4736                 usage("scriptstatus");
4737         }
4738
4739         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4740         if (slist == NULL) {
4741                 fprintf(stderr, "Memory allocation error\n");
4742                 return 1;
4743         }
4744
4745         num = 0;
4746         for (i=start; i<end; i++) {
4747                 event = ctdb_event_from_string(all_events[i]);
4748
4749                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4750                                                         ctdb->client,
4751                                                         ctdb->cmd_pnn,
4752                                                         TIMEOUT(), event,
4753                                                         &slist[num]);
4754                 if (ret != 0) {
4755                         fprintf(stderr,
4756                                 "failed to get script status for %s event\n",
4757                                 all_events[i]);
4758                         return 1;
4759                 }
4760
4761                 if (slist[num] == NULL) {
4762                         num++;
4763                         continue;
4764                 }
4765
4766                 /* The ETIME status is ignored for certain events.
4767                  * In that case the status is 0, but endtime is not set.
4768                  */
4769                 for (j=0; j<slist[num]->num_scripts; j++) {
4770                         if (slist[num]->script[j].status == 0 &&
4771                             timeval_is_zero(&slist[num]->script[j].finished)) {
4772                                 slist[num]->script[j].status = -ETIME;
4773                         }
4774                 }
4775
4776                 num++;
4777         }
4778
4779         print_scriptstatus(slist, count, &all_events[start]);
4780         return 0;
4781 }
4782
4783 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4784                                 int argc, const char **argv)
4785 {
4786         int ret;
4787
4788         if (argc != 1) {
4789                 usage("enablescript");
4790         }
4791
4792         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4793                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4794         if (ret != 0) {
4795                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4796                 return ret;
4797         }
4798
4799         return 0;
4800 }
4801
4802 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4803                                  int argc, const char **argv)
4804 {
4805         int ret;
4806
4807         if (argc != 1) {
4808                 usage("disablescript");
4809         }
4810
4811         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4812                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4813         if (ret != 0) {
4814                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4815                 return ret;
4816         }
4817
4818         return 0;
4819 }
4820
4821 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4822                          int argc, const char **argv)
4823 {
4824         char *t, *natgw_helper = NULL;
4825
4826         if (argc != 1) {
4827                 usage("natgw");
4828         }
4829
4830         t = getenv("CTDB_NATGW_HELPER");
4831         if (t != NULL) {
4832                 natgw_helper = talloc_strdup(mem_ctx, t);
4833         } else {
4834                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4835                                                CTDB_HELPER_BINDIR);
4836         }
4837
4838         if (natgw_helper == NULL) {
4839                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4840                 return 1;
4841         }
4842
4843         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4844 }
4845
4846 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4847                              int argc, const char **argv)
4848 {
4849         char *t, *natgw_helper = NULL;
4850
4851         if (argc != 0) {
4852                 usage("natgwlist");
4853         }
4854
4855         t = getenv("CTDB_NATGW_HELPER");
4856         if (t != NULL) {
4857                 natgw_helper = talloc_strdup(mem_ctx, t);
4858         } else {
4859                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4860                                                CTDB_HELPER_BINDIR);
4861         }
4862
4863         if (natgw_helper == NULL) {
4864                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4865                 return 1;
4866         }
4867
4868         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4869 }
4870
4871 /*
4872  * Find the PNN of the current node
4873  * discover the pnn by loading the nodes file and try to bind
4874  * to all addresses one at a time until the ip address is found.
4875  */
4876 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4877 {
4878         struct ctdb_node_map *nodemap;
4879         int i;
4880
4881         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4882         if (nodemap == NULL) {
4883                 return false;
4884         }
4885
4886         for (i=0; i<nodemap->num; i++) {
4887                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4888                         continue;
4889                 }
4890                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4891                         if (pnn != NULL) {
4892                                 *pnn = nodemap->node[i].pnn;
4893                         }
4894                         talloc_free(nodemap);
4895                         return true;
4896                 }
4897         }
4898
4899         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4900         talloc_free(nodemap);
4901         return false;
4902 }
4903
4904 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4905                               int argc, const char **argv)
4906 {
4907         const char *reclock;
4908         int ret;
4909
4910         if (argc != 0) {
4911                 usage("getreclock");
4912         }
4913
4914         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4915                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4916         if (ret != 0) {
4917                 return ret;
4918         }
4919
4920         if (reclock != NULL) {
4921                 printf("%s\n", reclock);
4922         }
4923
4924         return 0;
4925 }
4926
4927 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4928                                   struct ctdb_context *ctdb,
4929                                   int argc, const char **argv)
4930 {
4931         uint32_t lmasterrole = 0;
4932         int ret;
4933
4934         if (argc != 1) {
4935                 usage("setlmasterrole");
4936         }
4937
4938         if (strcmp(argv[0], "on") == 0) {
4939                 lmasterrole = 1;
4940         } else if (strcmp(argv[0], "off") == 0) {
4941                 lmasterrole = 0;
4942         } else {
4943                 usage("setlmasterrole");
4944         }
4945
4946         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4947                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4948         if (ret != 0) {
4949                 return ret;
4950         }
4951
4952         return 0;
4953 }
4954
4955 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4956                                     struct ctdb_context *ctdb,
4957                                     int argc, const char **argv)
4958 {
4959         uint32_t recmasterrole = 0;
4960         int ret;
4961
4962         if (argc != 1) {
4963                 usage("setrecmasterrole");
4964         }
4965
4966         if (strcmp(argv[0], "on") == 0) {
4967                 recmasterrole = 1;
4968         } else if (strcmp(argv[0], "off") == 0) {
4969                 recmasterrole = 0;
4970         } else {
4971                 usage("setrecmasterrole");
4972         }
4973
4974         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4975                                           ctdb->cmd_pnn, TIMEOUT(),
4976                                           recmasterrole);
4977         if (ret != 0) {
4978                 return ret;
4979         }
4980
4981         return 0;
4982 }
4983
4984 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4985                                  struct ctdb_context *ctdb,
4986                                  int argc, const char **argv)
4987 {
4988         uint32_t db_id;
4989         uint8_t db_flags;
4990         int ret;
4991
4992         if (argc != 1) {
4993                 usage("setdbreadonly");
4994         }
4995
4996         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4997                 return 1;
4998         }
4999
5000         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5001                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
5002                 return 1;
5003         }
5004
5005         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5006                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5007         if (ret != 0) {
5008                 return ret;
5009         }
5010
5011         return 0;
5012 }
5013
5014 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5015                                int argc, const char **argv)
5016 {
5017         uint32_t db_id;
5018         uint8_t db_flags;
5019         int ret;
5020
5021         if (argc != 1) {
5022                 usage("setdbsticky");
5023         }
5024
5025         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5026                 return 1;
5027         }
5028
5029         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5030                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5031                 return 1;
5032         }
5033
5034         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5035                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5036         if (ret != 0) {
5037                 return ret;
5038         }
5039
5040         return 0;
5041 }
5042
5043 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5044                           int argc, const char **argv)
5045 {
5046         const char *db_name;
5047         struct ctdb_db_context *db;
5048         struct ctdb_transaction_handle *h;
5049         uint8_t db_flags;
5050         TDB_DATA key, data;
5051         int ret;
5052
5053         if (argc < 2 || argc > 3) {
5054                 usage("pfetch");
5055         }
5056
5057         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5058                 return 1;
5059         }
5060
5061         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5062                 fprintf(stderr, "DB %s is not a persistent database\n",
5063                         db_name);
5064                 return 1;
5065         }
5066
5067         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5068                           db_flags, &db);
5069         if (ret != 0) {
5070                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5071                 return ret;
5072         }
5073
5074         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5075         if (ret != 0) {
5076                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5077                 return ret;
5078         }
5079
5080         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5081                                      TIMEOUT(), db, true, &h);
5082         if (ret != 0) {
5083                 fprintf(stderr, "Failed to start transaction on db %s\n",
5084                         db_name);
5085                 return ret;
5086         }
5087
5088         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5089         if (ret != 0) {
5090                 fprintf(stderr, "Failed to read record for key %s\n",
5091                         argv[1]);
5092                 ctdb_transaction_cancel(h);
5093                 return ret;
5094         }
5095
5096         printf("%.*s\n", (int)data.dsize, data.dptr);
5097
5098         ctdb_transaction_cancel(h);
5099         return 0;
5100 }
5101
5102 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5103                           int argc, const char **argv)
5104 {
5105         const char *db_name;
5106         struct ctdb_db_context *db;
5107         struct ctdb_transaction_handle *h;
5108         uint8_t db_flags;
5109         TDB_DATA key, data;
5110         int ret;
5111
5112         if (argc != 3) {
5113                 usage("pstore");
5114         }
5115
5116         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5117                 return 1;
5118         }
5119
5120         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5121                 fprintf(stderr, "DB %s is not a persistent database\n",
5122                         db_name);
5123                 return 1;
5124         }
5125
5126         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5127                           db_flags, &db);
5128         if (ret != 0) {
5129                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5130                 return ret;
5131         }
5132
5133         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5134         if (ret != 0) {
5135                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5136                 return ret;
5137         }
5138
5139         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5140         if (ret != 0) {
5141                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5142                 return ret;
5143         }
5144
5145         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5146                                      TIMEOUT(), db, false, &h);
5147         if (ret != 0) {
5148                 fprintf(stderr, "Failed to start transaction on db %s\n",
5149                         db_name);
5150                 return ret;
5151         }
5152
5153         ret = ctdb_transaction_store_record(h, key, data);
5154         if (ret != 0) {
5155                 fprintf(stderr, "Failed to store record for key %s\n",
5156                         argv[1]);
5157                 ctdb_transaction_cancel(h);
5158                 return ret;
5159         }
5160
5161         ret = ctdb_transaction_commit(h);
5162         if (ret != 0) {
5163                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5164                         db_name);
5165                 ctdb_transaction_cancel(h);
5166                 return ret;
5167         }
5168
5169         return 0;
5170 }
5171
5172 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5173                            int argc, const char **argv)
5174 {
5175         const char *db_name;
5176         struct ctdb_db_context *db;
5177         struct ctdb_transaction_handle *h;
5178         uint8_t db_flags;
5179         TDB_DATA key;
5180         int ret;
5181
5182         if (argc != 2) {
5183                 usage("pdelete");
5184         }
5185
5186         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5187                 return 1;
5188         }
5189
5190         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5191                 fprintf(stderr, "DB %s is not a persistent database\n",
5192                         db_name);
5193                 return 1;
5194         }
5195
5196         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5197                           db_flags, &db);
5198         if (ret != 0) {
5199                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5200                 return ret;
5201         }
5202
5203         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5204         if (ret != 0) {
5205                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5206                 return ret;
5207         }
5208
5209         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5210                                      TIMEOUT(), db, false, &h);
5211         if (ret != 0) {
5212                 fprintf(stderr, "Failed to start transaction on db %s\n",
5213                         db_name);
5214                 return ret;
5215         }
5216
5217         ret = ctdb_transaction_delete_record(h, key);
5218         if (ret != 0) {
5219                 fprintf(stderr, "Failed to delete record for key %s\n",
5220                         argv[1]);
5221                 ctdb_transaction_cancel(h);
5222                 return ret;
5223         }
5224
5225         ret = ctdb_transaction_commit(h);
5226         if (ret != 0) {
5227                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5228                         db_name);
5229                 ctdb_transaction_cancel(h);
5230                 return ret;
5231         }
5232
5233         return 0;
5234 }
5235
5236 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5237 {
5238         const char *t;
5239         size_t n;
5240         int ret;
5241
5242         *data = tdb_null;
5243
5244         /* Skip whitespace */
5245         n = strspn(*ptr, " \t");
5246         t = *ptr + n;
5247
5248         if (t[0] == '"') {
5249                 /* Quoted ASCII string - no wide characters! */
5250                 t++;
5251                 n = strcspn(t, "\"");
5252                 if (t[n] == '"') {
5253                         if (n > 0) {
5254                                 ret = str_to_data(t, n, mem_ctx, data);
5255                                 if (ret != 0) {
5256                                         return ret;
5257                                 }
5258                         }
5259                         *ptr = t + n + 1;
5260                 } else {
5261                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5262                         return 1;
5263                 }
5264         } else {
5265                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5266                 return 1;
5267         }
5268
5269         return 0;
5270 }
5271
5272 #define MAX_LINE_SIZE   1024
5273
5274 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5275                                  TDB_DATA *key, TDB_DATA *value)
5276 {
5277         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5278         const char *ptr;
5279         int ret;
5280
5281         ptr = fgets(line, MAX_LINE_SIZE, file);
5282         if (ptr == NULL) {
5283                 return false;
5284         }
5285
5286         /* Get key */
5287         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5288         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5289                 /* Line Ignored but not EOF */
5290                 *key = tdb_null;
5291                 return true;
5292         }
5293
5294         /* Get value */
5295         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5296         if (ret != 0) {
5297                 /* Line Ignored but not EOF */
5298                 talloc_free(key->dptr);
5299                 *key = tdb_null;
5300                 return true;
5301         }
5302
5303         return true;
5304 }
5305
5306 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5307                           int argc, const char **argv)
5308 {
5309         const char *db_name;
5310         struct ctdb_db_context *db;
5311         struct ctdb_transaction_handle *h;
5312         uint8_t db_flags;
5313         FILE *file;
5314         TDB_DATA key, value;
5315         int ret;
5316
5317         if (argc < 1 || argc > 2) {
5318                 usage("ptrans");
5319         }
5320
5321         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5322                 return 1;
5323         }
5324
5325         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5326                 fprintf(stderr, "DB %s is not a persistent database\n",
5327                         db_name);
5328                 return 1;
5329         }
5330
5331         if (argc == 2) {
5332                 file = fopen(argv[1], "r");
5333                 if (file == NULL) {
5334                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5335                         return 1;
5336                 }
5337         } else {
5338                 file = stdin;
5339         }
5340
5341         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5342                           db_flags, &db);
5343         if (ret != 0) {
5344                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5345                 goto done;
5346         }
5347
5348         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5349                                      TIMEOUT(), db, false, &h);
5350         if (ret != 0) {
5351                 fprintf(stderr, "Failed to start transaction on db %s\n",
5352                         db_name);
5353                 goto done;
5354         }
5355
5356         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5357                 if (key.dsize != 0) {
5358                         ret = ctdb_transaction_store_record(h, key, value);
5359                         if (ret != 0) {
5360                                 fprintf(stderr, "Failed to store record\n");
5361                                 ctdb_transaction_cancel(h);
5362                                 goto done;
5363                         }
5364                         talloc_free(key.dptr);
5365                         talloc_free(value.dptr);
5366                 }
5367         }
5368
5369         ret = ctdb_transaction_commit(h);
5370         if (ret != 0) {
5371                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5372                         db_name);
5373                 ctdb_transaction_cancel(h);
5374         }
5375
5376 done:
5377         if (file != stdin) {
5378                 fclose(file);
5379         }
5380         return ret;
5381 }
5382
5383 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5384                           int argc, const char **argv)
5385 {
5386         struct tdb_context *tdb;
5387         TDB_DATA key, data;
5388         struct ctdb_ltdb_header header;
5389         int ret;
5390
5391         if (argc < 2 || argc > 3) {
5392                 usage("tfetch");
5393         }
5394
5395         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5396         if (tdb == NULL) {
5397                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5398                 return 1;
5399         }
5400
5401         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5402         if (ret != 0) {
5403                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5404                 tdb_close(tdb);
5405                 return ret;
5406         }
5407
5408         data = tdb_fetch(tdb, key);
5409         if (data.dptr == NULL) {
5410                 fprintf(stderr, "No record for key %s\n", argv[1]);
5411                 tdb_close(tdb);
5412                 return 1;
5413         }
5414
5415         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5416                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5417                 tdb_close(tdb);
5418                 return 1;
5419         }
5420
5421         tdb_close(tdb);
5422
5423         if (argc == 3) {
5424                 int fd;
5425                 ssize_t nwritten;
5426
5427                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5428                 if (fd == -1) {
5429                         fprintf(stderr, "Failed to open output file %s\n",
5430                                 argv[2]);
5431                         goto fail;
5432                 }
5433
5434                 nwritten = sys_write(fd, data.dptr, data.dsize);
5435                 if (nwritten != data.dsize) {
5436                         fprintf(stderr, "Failed to write record to file\n");
5437                         close(fd);
5438                         goto fail;
5439                 }
5440
5441                 close(fd);
5442         }
5443
5444 fail:
5445         ret = ctdb_ltdb_header_extract(&data, &header);
5446         if (ret != 0) {
5447                 fprintf(stderr, "Failed to parse header from data\n");
5448                 return 1;
5449         }
5450
5451         dump_ltdb_header(&header);
5452         dump_tdb_data("data", data);
5453
5454         return 0;
5455 }
5456
5457 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5458                           int argc, const char **argv)
5459 {
5460         struct tdb_context *tdb;
5461         TDB_DATA key, data[2], value;
5462         struct ctdb_ltdb_header header;
5463         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5464         int ret;
5465
5466         if (argc < 3 || argc > 5) {
5467                 usage("tstore");
5468         }
5469
5470         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5471         if (tdb == NULL) {
5472                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5473                 return 1;
5474         }
5475
5476         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5477         if (ret != 0) {
5478                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5479                 tdb_close(tdb);
5480                 return ret;
5481         }
5482
5483         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5484         if (ret != 0) {
5485                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5486                 tdb_close(tdb);
5487                 return ret;
5488         }
5489
5490         ZERO_STRUCT(header);
5491
5492         if (argc > 3) {
5493                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5494         }
5495         if (argc > 4) {
5496                 header.dmaster = (uint32_t)atol(argv[4]);
5497         }
5498         if (argc > 5) {
5499                 header.flags = (uint32_t)atol(argv[5]);
5500         }
5501
5502         ctdb_ltdb_header_push(&header, header_buf);
5503
5504         data[0].dsize = ctdb_ltdb_header_len(&header);
5505         data[0].dptr = header_buf;
5506
5507         data[1].dsize = value.dsize;
5508         data[1].dptr = value.dptr;
5509
5510         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5511         if (ret != 0) {
5512                 fprintf(stderr, "Failed to write record %s to file %s\n",
5513                         argv[1], argv[0]);
5514         }
5515
5516         tdb_close(tdb);
5517
5518         return ret;
5519 }
5520
5521 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5522                            int argc, const char **argv)
5523 {
5524         const char *db_name;
5525         struct ctdb_db_context *db;
5526         struct ctdb_record_handle *h;
5527         uint8_t db_flags;
5528         TDB_DATA key, data;
5529         bool readonly = false;
5530         int ret;
5531
5532         if (argc < 2 || argc > 3) {
5533                 usage("readkey");
5534         }
5535
5536         if (argc == 3) {
5537                 if (strcmp(argv[2], "readonly") == 0) {
5538                         readonly = true;
5539                 } else {
5540                         usage("readkey");
5541                 }
5542         }
5543
5544         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5545                 return 1;
5546         }
5547
5548         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5549                 fprintf(stderr, "DB %s is not a volatile database\n",
5550                         db_name);
5551                 return 1;
5552         }
5553
5554         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5555                           db_flags, &db);
5556         if (ret != 0) {
5557                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5558                 return ret;
5559         }
5560
5561         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5562         if (ret != 0) {
5563                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5564                 return ret;
5565         }
5566
5567         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5568                               db, key, readonly, &h, NULL, &data);
5569         if (ret != 0) {
5570                 fprintf(stderr, "Failed to read record for key %s\n",
5571                         argv[1]);
5572         } else {
5573                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5574                        (int)data.dsize, data.dptr);
5575         }
5576
5577         talloc_free(h);
5578         return ret;
5579 }
5580
5581 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5582                             int argc, const char **argv)
5583 {
5584         const char *db_name;
5585         struct ctdb_db_context *db;
5586         struct ctdb_record_handle *h;
5587         uint8_t db_flags;
5588         TDB_DATA key, data;
5589         int ret;
5590
5591         if (argc != 3) {
5592                 usage("writekey");
5593         }
5594
5595         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5596                 return 1;
5597         }
5598
5599         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5600                 fprintf(stderr, "DB %s is not a volatile database\n",
5601                         db_name);
5602                 return 1;
5603         }
5604
5605         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5606                           db_flags, &db);
5607         if (ret != 0) {
5608                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5609                 return ret;
5610         }
5611
5612         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5613         if (ret != 0) {
5614                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5615                 return ret;
5616         }
5617
5618         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5619         if (ret != 0) {
5620                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5621                 return ret;
5622         }
5623
5624         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5625                               db, key, false, &h, NULL, NULL);
5626         if (ret != 0) {
5627                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5628                 return ret;
5629         }
5630
5631         ret = ctdb_store_record(h, data);
5632         if (ret != 0) {
5633                 fprintf(stderr, "Failed to store record for key %s\n",
5634                         argv[1]);
5635         }
5636
5637         talloc_free(h);
5638         return ret;
5639 }
5640
5641 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5642                              int argc, const char **argv)
5643 {
5644         const char *db_name;
5645         struct ctdb_db_context *db;
5646         struct ctdb_record_handle *h;
5647         uint8_t db_flags;
5648         TDB_DATA key, data;
5649         int ret;
5650
5651         if (argc != 2) {
5652                 usage("deletekey");
5653         }
5654
5655         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5656                 return 1;
5657         }
5658
5659         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5660                 fprintf(stderr, "DB %s is not a volatile database\n",
5661                         db_name);
5662                 return 1;
5663         }
5664
5665         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5666                           db_flags, &db);
5667         if (ret != 0) {
5668                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5669                 return ret;
5670         }
5671
5672         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5673         if (ret != 0) {
5674                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5675                 return ret;
5676         }
5677
5678         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5679                               db, key, false, &h, NULL, &data);
5680         if (ret != 0) {
5681                 fprintf(stderr, "Failed to fetch record for key %s\n",
5682                         argv[1]);
5683                 return ret;
5684         }
5685
5686         ret = ctdb_delete_record(h);
5687         if (ret != 0) {
5688                 fprintf(stderr, "Failed to delete record for key %s\n",
5689                         argv[1]);
5690         }
5691
5692         talloc_free(h);
5693         return ret;
5694 }
5695
5696 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5697                                 int argc, const char **argv)
5698 {
5699         struct sockaddr_in sin;
5700         unsigned int port;
5701         int s, v;
5702         int ret;
5703
5704         if (argc != 1) {
5705                 usage("chktcpport");
5706         }
5707
5708         port = atoi(argv[0]);
5709
5710         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5711         if (s == -1) {
5712                 fprintf(stderr, "Failed to open local socket\n");
5713                 return errno;
5714         }
5715
5716         v = fcntl(s, F_GETFL, 0);
5717         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5718                 fprintf(stderr, "Unable to set socket non-blocking\n");
5719                 close(s);
5720                 return errno;
5721         }
5722
5723         bzero(&sin, sizeof(sin));
5724         sin.sin_family = AF_INET;
5725         sin.sin_port = htons(port);
5726         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5727         close(s);
5728         if (ret == -1) {
5729                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5730                 return errno;
5731         }
5732
5733         return 0;
5734 }
5735
5736 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5737                                int argc, const char **argv)
5738 {
5739         uint32_t db_id;
5740         const char *db_name;
5741         uint64_t seqnum;
5742         int ret;
5743
5744         if (argc != 1) {
5745                 usage("getdbseqnum");
5746         }
5747
5748         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5749                 return 1;
5750         }
5751
5752         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5753                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5754                                       &seqnum);
5755         if (ret != 0) {
5756                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5757                         db_name);
5758                 return ret;
5759         }
5760
5761         printf("0x%"PRIx64"\n", seqnum);
5762         return 0;
5763 }
5764
5765 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5766                               int argc, const char **argv)
5767 {
5768         const char *nodestring = NULL;
5769         struct ctdb_node_map *nodemap;
5770         int ret, i;
5771
5772         if (argc > 1) {
5773                 usage("nodestatus");
5774         }
5775
5776         if (argc == 1) {
5777                 nodestring = argv[0];
5778         }
5779
5780         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5781                 return 1;
5782         }
5783
5784         nodemap = get_nodemap(ctdb, false);
5785         if (nodemap == NULL) {
5786                 return 1;
5787         }
5788
5789         if (options.machinereadable) {
5790                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5791         } else {
5792                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5793         }
5794
5795         ret = 0;
5796         for (i=0; i<nodemap->num; i++) {
5797                 ret |= nodemap->node[i].flags;
5798         }
5799
5800         return ret;
5801 }
5802
5803 const struct {
5804         const char *name;
5805         uint32_t offset;
5806 } db_stats_fields[] = {
5807 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5808         DBSTATISTICS_FIELD(db_ro_delegations),
5809         DBSTATISTICS_FIELD(db_ro_revokes),
5810         DBSTATISTICS_FIELD(locks.num_calls),
5811         DBSTATISTICS_FIELD(locks.num_current),
5812         DBSTATISTICS_FIELD(locks.num_pending),
5813         DBSTATISTICS_FIELD(locks.num_failed),
5814         DBSTATISTICS_FIELD(db_ro_delegations),
5815 };
5816
5817 static void print_dbstatistics(const char *db_name,
5818                                struct ctdb_db_statistics *s)
5819 {
5820         int i;
5821         const char *prefix = NULL;
5822         int preflen = 0;
5823
5824         printf("DB Statistics %s\n", db_name);
5825
5826         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5827                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5828                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5829                         if (! prefix ||
5830                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5831                                 prefix = db_stats_fields[i].name;
5832                                 printf(" %*.*s\n", preflen-1, preflen-1,
5833                                        db_stats_fields[i].name);
5834                         }
5835                 } else {
5836                         preflen = 0;
5837                 }
5838                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5839                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5840                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5841         }
5842
5843         printf(" hop_count_buckets:");
5844         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5845                 printf(" %d", s->hop_count_bucket[i]);
5846         }
5847         printf("\n");
5848
5849         printf(" lock_buckets:");
5850         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5851                 printf(" %d", s->locks.buckets[i]);
5852         }
5853         printf("\n");
5854
5855         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5856                "locks_latency      MIN/AVG/MAX",
5857                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5858                s->locks.latency.max, s->locks.latency.num);
5859
5860         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5861                "vacuum_latency     MIN/AVG/MAX",
5862                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5863                s->vacuum.latency.max, s->vacuum.latency.num);
5864
5865         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5866         for (i=0; i<s->num_hot_keys; i++) {
5867                 int j;
5868                 printf("     Count:%d Key:", s->hot_keys[i].count);
5869                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5870                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5871                 }
5872                 printf("\n");
5873         }
5874 }
5875
5876 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5877                                 int argc, const char **argv)
5878 {
5879         uint32_t db_id;
5880         const char *db_name;
5881         struct ctdb_db_statistics *dbstats;
5882         int ret;
5883
5884         if (argc != 1) {
5885                 usage("dbstatistics");
5886         }
5887
5888         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5889                 return 1;
5890         }
5891
5892         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5893                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5894                                           &dbstats);
5895         if (ret != 0) {
5896                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5897                         db_name);
5898                 return ret;
5899         }
5900
5901         print_dbstatistics(db_name, dbstats);
5902         return 0;
5903 }
5904
5905 struct disable_takeover_runs_state {
5906         uint32_t *pnn_list;
5907         int node_count;
5908         bool *reply;
5909         int status;
5910         bool done;
5911 };
5912
5913 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5914                                          void *private_data)
5915 {
5916         struct disable_takeover_runs_state *state =
5917                 (struct disable_takeover_runs_state *)private_data;
5918         int ret, i;
5919
5920         if (data.dsize != sizeof(int)) {
5921                 /* Ignore packet */
5922                 return;
5923         }
5924
5925         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5926         ret = *(int *)data.dptr;
5927         if (ret < 0) {
5928                 state->status = ret;
5929                 state->done = true;
5930                 return;
5931         }
5932         for (i=0; i<state->node_count; i++) {
5933                 if (state->pnn_list[i] == ret) {
5934                         state->reply[i] = true;
5935                         break;
5936                 }
5937         }
5938
5939         state->done = true;
5940         for (i=0; i<state->node_count; i++) {
5941                 if (! state->reply[i]) {
5942                         state->done = false;
5943                         break;
5944                 }
5945         }
5946 }
5947
5948 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5949                                  struct ctdb_context *ctdb, uint32_t timeout,
5950                                  uint32_t *pnn_list, int count)
5951 {
5952         struct ctdb_disable_message disable = { 0 };
5953         struct disable_takeover_runs_state state;
5954         int ret, i;
5955
5956         disable.pnn = ctdb->pnn;
5957         disable.srvid = next_srvid(ctdb);
5958         disable.timeout = timeout;
5959
5960         state.pnn_list = pnn_list;
5961         state.node_count = count;
5962         state.done = false;
5963         state.status = 0;
5964         state.reply = talloc_zero_array(mem_ctx, bool, count);
5965         if (state.reply == NULL) {
5966                 return ENOMEM;
5967         }
5968
5969         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5970                                               disable.srvid,
5971                                               disable_takeover_run_handler,
5972                                               &state);
5973         if (ret != 0) {
5974                 return ret;
5975         }
5976
5977         for (i=0; i<count; i++) {
5978                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5979                                                          ctdb->client,
5980                                                          pnn_list[i],
5981                                                          &disable);
5982                 if (ret != 0) {
5983                         goto fail;
5984                 }
5985         }
5986
5987         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5988         if (ret == ETIME) {
5989                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5990         } else {
5991                 ret = (state.status >= 0 ? 0 : 1);
5992         }
5993
5994 fail:
5995         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5996                                            disable.srvid, &state);
5997         return ret;
5998 }
5999
6000 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6001                              int argc, const char **argv)
6002 {
6003         const char *nodestring = NULL;
6004         struct ctdb_node_map *nodemap, *nodemap2;
6005         struct ctdb_req_control request;
6006         uint32_t *pnn_list, *pnn_list2;
6007         int ret, count, count2;
6008
6009         if (argc > 1) {
6010                 usage("reloadips");
6011         }
6012
6013         if (argc == 1) {
6014                 nodestring = argv[0];
6015         }
6016
6017         nodemap = get_nodemap(ctdb, false);
6018         if (nodemap == NULL) {
6019                 return 1;
6020         }
6021
6022         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6023                 return 1;
6024         }
6025
6026         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6027                                         mem_ctx, &pnn_list);
6028         if (count <= 0) {
6029                 fprintf(stderr, "Memory allocation error\n");
6030                 return 1;
6031         }
6032
6033         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6034                                       mem_ctx, &pnn_list2);
6035         if (count2 <= 0) {
6036                 fprintf(stderr, "Memory allocation error\n");
6037                 return 1;
6038         }
6039
6040         /* Disable takeover runs on all connected nodes.  A reply
6041          * indicating success is needed from each node so all nodes
6042          * will need to be active.
6043          *
6044          * A check could be added to not allow reloading of IPs when
6045          * there are disconnected nodes.  However, this should
6046          * probably be left up to the administrator.
6047          */
6048         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6049                                     pnn_list, count);
6050         if (ret != 0) {
6051                 fprintf(stderr, "Failed to disable takeover runs\n");
6052                 return ret;
6053         }
6054
6055         /* Now tell all the desired nodes to reload their public IPs.
6056          * Keep trying this until it succeeds.  This assumes all
6057          * failures are transient, which might not be true...
6058          */
6059         ctdb_req_control_reload_public_ips(&request);
6060         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6061                                         pnn_list2, count2, TIMEOUT(),
6062                                         &request, NULL, NULL);
6063         if (ret != 0) {
6064                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6065         }
6066
6067         /* It isn't strictly necessary to wait until takeover runs are
6068          * re-enabled but doing so can't hurt.
6069          */
6070         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6071         if (ret != 0) {
6072                 fprintf(stderr, "Failed to enable takeover runs\n");
6073                 return ret;
6074         }
6075
6076         return ipreallocate(mem_ctx, ctdb);
6077 }
6078
6079 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6080                            int argc, const char **argv)
6081 {
6082         ctdb_sock_addr addr;
6083         char *iface;
6084
6085         if (argc != 1) {
6086                 usage("ipiface");
6087         }
6088
6089         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6090                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6091                 return 1;
6092         }
6093
6094         iface = ctdb_sys_find_ifname(&addr);
6095         if (iface == NULL) {
6096                 fprintf(stderr, "Failed to find interface for IP %s\n",
6097                         argv[0]);
6098                 return 1;
6099         }
6100         free(iface);
6101
6102         return 0;
6103 }
6104
6105
6106 static const struct ctdb_cmd {
6107         const char *name;
6108         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6109         bool without_daemon; /* can be run without daemon running ? */
6110         bool remote; /* can be run on remote nodes */
6111         const char *msg;
6112         const char *args;
6113 } ctdb_commands[] = {
6114         { "version", control_version, true, false,
6115                 "show version of ctdb", NULL },
6116         { "status", control_status, false, true,
6117                 "show node status", NULL },
6118         { "uptime", control_uptime, false, true,
6119                 "show node uptime", NULL },
6120         { "ping", control_ping, false, true,
6121                 "ping all nodes", NULL },
6122         { "runstate", control_runstate, false, true,
6123                 "get/check runstate of a node",
6124                 "[setup|first_recovery|startup|running]" },
6125         { "getvar", control_getvar, false, true,
6126                 "get a tunable variable", "<name>" },
6127         { "setvar", control_setvar, false, true,
6128                 "set a tunable variable", "<name> <value>" },
6129         { "listvars", control_listvars, false, true,
6130                 "list tunable variables", NULL },
6131         { "statistics", control_statistics, false, true,
6132                 "show ctdb statistics", NULL },
6133         { "statisticsreset", control_statistics_reset, false, true,
6134                 "reset ctdb statistics", NULL },
6135         { "stats", control_stats, false, true,
6136                 "show rolling statistics", "[count]" },
6137         { "ip", control_ip, false, true,
6138                 "show public ips", "[all]" },
6139         { "ipinfo", control_ipinfo, false, true,
6140                 "show public ip details", "<ip>" },
6141         { "ifaces", control_ifaces, false, true,
6142                 "show interfaces", NULL },
6143         { "setifacelink", control_setifacelink, false, true,
6144                 "set interface link status", "<iface> up|down" },
6145         { "process-exists", control_process_exists, false, true,
6146                 "check if a process exists on a node",  "<pid>" },
6147         { "getdbmap", control_getdbmap, false, true,
6148                 "show attached databases", NULL },
6149         { "getdbstatus", control_getdbstatus, false, true,
6150                 "show database status", "<dbname|dbid>" },
6151         { "catdb", control_catdb, false, false,
6152                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6153         { "cattdb", control_cattdb, false, false,
6154                 "dump local ctdb database", "<dbname|dbid>" },
6155         { "getmonmode", control_getmonmode, false, true,
6156                 "show monitoring mode", NULL },
6157         { "getcapabilities", control_getcapabilities, false, true,
6158                 "show node capabilities", NULL },
6159         { "pnn", control_pnn, false, false,
6160                 "show the pnn of the currnet node", NULL },
6161         { "lvs", control_lvs, false, false,
6162                 "show lvs configuration", "master|list|status" },
6163         { "disablemonitor", control_disable_monitor, false, true,
6164                 "disable monitoring", NULL },
6165         { "enablemonitor", control_enable_monitor, false, true,
6166                 "enable monitoring", NULL },
6167         { "setdebug", control_setdebug, false, true,
6168                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6169         { "getdebug", control_getdebug, false, true,
6170                 "get debug level", NULL },
6171         { "attach", control_attach, false, false,
6172                 "attach a database", "<dbname> [persistent]" },
6173         { "detach", control_detach, false, false,
6174                 "detach database(s)", "<dbname|dbid> ..." },
6175         { "dumpmemory", control_dumpmemory, false, true,
6176                 "dump ctdbd memory map", NULL },
6177         { "rddumpmemory", control_rddumpmemory, false, true,
6178                 "dump recoverd memory map", NULL },
6179         { "getpid", control_getpid, false, true,
6180                 "get ctdbd process ID", NULL },
6181         { "disable", control_disable, false, true,
6182                 "disable a node", NULL },
6183         { "enable", control_enable, false, true,
6184                 "enable a node", NULL },
6185         { "stop", control_stop, false, true,
6186                 "stop a node", NULL },
6187         { "continue", control_continue, false, true,
6188                 "continue a stopped node", NULL },
6189         { "ban", control_ban, false, true,
6190                 "ban a node", "<bantime>"},
6191         { "unban", control_unban, false, true,
6192                 "unban a node", NULL },
6193         { "shutdown", control_shutdown, false, true,
6194                 "shutdown ctdb daemon", NULL },
6195         { "recover", control_recover, false, true,
6196                 "force recovery", NULL },
6197         { "sync", control_ipreallocate, false, true,
6198                 "run ip reallocation (deprecated)", NULL },
6199         { "ipreallocate", control_ipreallocate, false, true,
6200                 "run ip reallocation", NULL },
6201         { "isnotrecmaster", control_isnotrecmaster, false, false,
6202                 "check if local node is the recmaster", NULL },
6203         { "gratarp", control_gratarp, false, true,
6204                 "send a gratuitous arp", "<ip> <interface>" },
6205         { "tickle", control_tickle, true, false,
6206                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6207         { "gettickles", control_gettickles, false, true,
6208                 "get the list of tickles", "<ip> [<port>]" },
6209         { "addtickle", control_addtickle, false, true,
6210                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6211         { "deltickle", control_deltickle, false, true,
6212                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6213         { "check_srvids", control_check_srvids, false, true,
6214                 "check if srvid is registered", "<id> [<id> ...]" },
6215         { "listnodes", control_listnodes, true, true,
6216                 "list nodes in the cluster", NULL },
6217         { "reloadnodes", control_reloadnodes, false, false,
6218                 "reload the nodes file all nodes", NULL },
6219         { "moveip", control_moveip, false, false,
6220                 "move an ip address to another node", "<ip> <node>" },
6221         { "addip", control_addip, false, true,
6222                 "add an ip address to a node", "<ip/mask> <iface>" },
6223         { "delip", control_delip, false, true,
6224                 "delete an ip address from a node", "<ip>" },
6225         { "eventscript", control_eventscript, false, true,
6226                 "run an event", "monitor" },
6227         { "backupdb", control_backupdb, false, false,
6228                 "backup a database into a file", "<dbname|dbid> <file>" },
6229         { "restoredb", control_restoredb, false, false,
6230                 "restore a database from a file", "<file> [dbname]" },
6231         { "dumpdbbackup", control_dumpdbbackup, true, false,
6232                 "dump database from a backup file", "<file>" },
6233         { "wipedb", control_wipedb, false, false,
6234                 "wipe the contents of a database.", "<dbname|dbid>"},
6235         { "recmaster", control_recmaster, false, true,
6236                 "show the pnn for the recovery master", NULL },
6237         { "scriptstatus", control_scriptstatus, false, true,
6238                 "show event script status",
6239                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6240         { "enablescript", control_enablescript, false, true,
6241                 "enable an eventscript", "<script>"},
6242         { "disablescript", control_disablescript, false, true,
6243                 "disable an eventscript", "<script>"},
6244         { "natgw", control_natgw, false, false,
6245                 "show natgw configuration", "master|list|status" },
6246         { "natgwlist", control_natgwlist, false, false,
6247                 "show the nodes belonging to this natgw configuration", NULL },
6248         { "getreclock", control_getreclock, false, true,
6249                 "get recovery lock file", NULL },
6250         { "setlmasterrole", control_setlmasterrole, false, true,
6251                 "set LMASTER role", "on|off" },
6252         { "setrecmasterrole", control_setrecmasterrole, false, true,
6253                 "set RECMASTER role", "on|off"},
6254         { "setdbreadonly", control_setdbreadonly, false, true,
6255                 "enable readonly records", "<dbname|dbid>" },
6256         { "setdbsticky", control_setdbsticky, false, true,
6257                 "enable sticky records", "<dbname|dbid>"},
6258         { "pfetch", control_pfetch, false, false,
6259                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6260         { "pstore", control_pstore, false, false,
6261                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6262         { "pdelete", control_pdelete, false, false,
6263                 "delete record from persistent database", "<dbname|dbid> <key>" },
6264         { "ptrans", control_ptrans, false, false,
6265                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6266         { "tfetch", control_tfetch, false, true,
6267                 "fetch a record", "<tdb-file> <key> [<file>]" },
6268         { "tstore", control_tstore, false, true,
6269                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6270         { "readkey", control_readkey, false, false,
6271                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6272         { "writekey", control_writekey, false, false,
6273                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6274         { "deletekey", control_deletekey, false, false,
6275                 "delete a database key", "<dbname|dbid> <key>" },
6276         { "checktcpport", control_checktcpport, true, false,
6277                 "check if a service is bound to a specific tcp port or not", "<port>" },
6278         { "getdbseqnum", control_getdbseqnum, false, false,
6279                 "get database sequence number", "<dbname|dbid>" },
6280         { "nodestatus", control_nodestatus, false, true,
6281                 "show and return node status", "[all|<pnn-list>]" },
6282         { "dbstatistics", control_dbstatistics, false, true,
6283                 "show database statistics", "<dbname|dbid>" },
6284         { "reloadips", control_reloadips, false, false,
6285                 "reload the public addresses file", "[all|<pnn-list>]" },
6286         { "ipiface", control_ipiface, true, false,
6287                 "Find the interface an ip address is hosted on", "<ip>" },
6288 };
6289
6290 static const struct ctdb_cmd *match_command(const char *command)
6291 {
6292         const struct ctdb_cmd *cmd;
6293         int i;
6294
6295         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6296                 cmd = &ctdb_commands[i];
6297                 if (strlen(command) == strlen(cmd->name) &&
6298                     strncmp(command, cmd->name, strlen(command)) == 0) {
6299                         return cmd;
6300                 }
6301         }
6302
6303         return NULL;
6304 }
6305
6306
6307 /**
6308  * Show usage message
6309  */
6310 static void usage_full(void)
6311 {
6312         int i;
6313
6314         poptPrintHelp(pc, stdout, 0);
6315         printf("\nCommands:\n");
6316         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6317                 printf("  %-15s %-27s  %s\n",
6318                        ctdb_commands[i].name,
6319                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6320                        ctdb_commands[i].msg);
6321         }
6322 }
6323
6324 static void usage(const char *command)
6325 {
6326         const struct ctdb_cmd *cmd;
6327
6328         if (command == NULL) {
6329                 usage_full();
6330                 exit(1);
6331         }
6332
6333         cmd = match_command(command);
6334         if (cmd == NULL) {
6335                 usage_full();
6336         } else {
6337                 poptPrintUsage(pc, stdout, 0);
6338                 printf("\nCommands:\n");
6339                 printf("  %-15s %-27s  %s\n",
6340                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6341         }
6342
6343         exit(1);
6344 }
6345
6346 struct poptOption cmdline_options[] = {
6347         POPT_AUTOHELP
6348         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6349                 "CTDB socket path", "filename" },
6350         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6351                 "debug level"},
6352         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6353                 "timelimit (in seconds)" },
6354         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6355                 "node specification - integer" },
6356         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6357                 "enable machine readable output", NULL },
6358         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6359                 "specify separator for machine readable output", "CHAR" },
6360         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6361                 "enable machine parsable output with separator |", NULL },
6362         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6363                 "enable verbose output", NULL },
6364         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6365                 "die if runtime exceeds this limit (in seconds)" },
6366         POPT_TABLEEND
6367 };
6368
6369 static int process_command(const struct ctdb_cmd *cmd, int argc,
6370                            const char **argv)
6371 {
6372         TALLOC_CTX *tmp_ctx;
6373         struct ctdb_context *ctdb;
6374         int ret;
6375         bool status;
6376         uint64_t srvid_offset;
6377
6378         tmp_ctx = talloc_new(NULL);
6379         if (tmp_ctx == NULL) {
6380                 fprintf(stderr, "Memory allocation error\n");
6381                 goto fail;
6382         }
6383
6384         if (cmd->without_daemon) {
6385                 if (options.pnn != -1) {
6386                         fprintf(stderr,
6387                                 "Cannot specify node for command %s\n",
6388                                 cmd->name);
6389                         goto fail;
6390                 }
6391
6392                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6393                 talloc_free(tmp_ctx);
6394                 return ret;
6395         }
6396
6397         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6398         if (ctdb == NULL) {
6399                 fprintf(stderr, "Memory allocation error\n");
6400                 goto fail;
6401         }
6402
6403         ctdb->ev = tevent_context_init(ctdb);
6404         if (ctdb->ev == NULL) {
6405                 fprintf(stderr, "Failed to initialize tevent\n");
6406                 goto fail;
6407         }
6408
6409         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6410         if (ret != 0) {
6411                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6412                         options.socket);
6413
6414                 if (!find_node_xpnn(ctdb, NULL)) {
6415                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6416                 }
6417                 goto fail;
6418         }
6419
6420         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6421         srvid_offset = getpid() & 0xFFFF;
6422         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6423
6424         if (options.pnn != -1) {
6425                 status = verify_pnn(ctdb, options.pnn);
6426                 if (! status) {
6427                         goto fail;
6428                 }
6429
6430                 ctdb->cmd_pnn = options.pnn;
6431         } else {
6432                 ctdb->cmd_pnn = ctdb->pnn;
6433         }
6434
6435         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6436                 fprintf(stderr, "Node cannot be specified for command %s\n",
6437                         cmd->name);
6438                 goto fail;
6439         }
6440
6441         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6442         talloc_free(tmp_ctx);
6443         return ret;
6444
6445 fail:
6446         talloc_free(tmp_ctx);
6447         return 1;
6448 }
6449
6450 static void signal_handler(int sig)
6451 {
6452         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6453 }
6454
6455 static void alarm_handler(int sig)
6456 {
6457         /* Kill any child processes */
6458         signal(SIGTERM, signal_handler);
6459         kill(0, SIGTERM);
6460
6461         _exit(1);
6462 }
6463
6464 int main(int argc, const char *argv[])
6465 {
6466         int opt;
6467         const char **extra_argv;
6468         int extra_argc;
6469         const struct ctdb_cmd *cmd;
6470         const char *ctdb_socket;
6471         int loglevel;
6472         int ret;
6473
6474         setlinebuf(stdout);
6475
6476         /* Set default options */
6477         options.socket = CTDB_SOCKET;
6478         options.debuglevelstr = NULL;
6479         options.timelimit = 10;
6480         options.sep = "|";
6481         options.maxruntime = 0;
6482         options.pnn = -1;
6483
6484         ctdb_socket = getenv("CTDB_SOCKET");
6485         if (ctdb_socket != NULL) {
6486                 options.socket = ctdb_socket;
6487         }
6488
6489         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6490                             POPT_CONTEXT_KEEP_FIRST);
6491         while ((opt = poptGetNextOpt(pc)) != -1) {
6492                 fprintf(stderr, "Invalid option %s: %s\n",
6493                         poptBadOption(pc, 0), poptStrerror(opt));
6494                 exit(1);
6495         }
6496
6497         if (options.maxruntime == 0) {
6498                 const char *ctdb_timeout;
6499
6500                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6501                 if (ctdb_timeout != NULL) {
6502                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6503                 } else {
6504                         options.maxruntime = 120;
6505                 }
6506         }
6507         if (options.maxruntime <= 120) {
6508                 /* default timeout is 120 seconds */
6509                 options.maxruntime = 120;
6510         }
6511
6512         if (options.machineparsable) {
6513                 options.machinereadable = 1;
6514         }
6515
6516         /* setup the remaining options for the commands */
6517         extra_argc = 0;
6518         extra_argv = poptGetArgs(pc);
6519         if (extra_argv) {
6520                 extra_argv++;
6521                 while (extra_argv[extra_argc]) extra_argc++;
6522         }
6523
6524         if (extra_argc < 1) {
6525                 usage(NULL);
6526         }
6527
6528         cmd = match_command(extra_argv[0]);
6529         if (cmd == NULL) {
6530                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6531                 exit(1);
6532         }
6533
6534         /* Enable logging */
6535         setup_logging("ctdb", DEBUG_STDERR);
6536         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6537                 DEBUGLEVEL = loglevel;
6538         } else {
6539                 DEBUGLEVEL = DEBUG_ERR;
6540         }
6541
6542         signal(SIGALRM, alarm_handler);
6543         alarm(options.maxruntime);
6544
6545         ret = process_command(cmd, extra_argc, extra_argv);
6546         if (ret == -1) {
6547                 ret = 1;
6548         }
6549
6550         (void)poptFreeContext(pc);
6551
6552         return ret;
6553 }