ctdb-tools: Free record if it does not contain valid data
[amitay/samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "common/db_hash.h"
37 #include "common/logging.h"
38 #include "protocol/protocol.h"
39 #include "protocol/protocol_api.h"
40 #include "common/system.h"
41 #include "client/client.h"
42
43 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
44
45 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
46 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
47
48 static struct {
49         const char *socket;
50         const char *debuglevelstr;
51         int timelimit;
52         int pnn;
53         int machinereadable;
54         const char *sep;
55         int machineparsable;
56         int verbose;
57         int maxruntime;
58         int printemptyrecords;
59         int printdatasize;
60         int printlmaster;
61         int printhash;
62         int printrecordflags;
63 } options;
64
65 static poptContext pc;
66
67 struct ctdb_context {
68         struct tevent_context *ev;
69         struct ctdb_client_context *client;
70         struct ctdb_node_map *nodemap;
71         uint32_t pnn, cmd_pnn;
72         uint64_t srvid;
73 };
74
75 static void usage(const char *command);
76
77 /*
78  * Utility Functions
79  */
80
81 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
82 {
83         return (tv2->tv_sec - tv->tv_sec) +
84                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
85 }
86
87 static struct ctdb_node_and_flags *get_node_by_pnn(
88                                         struct ctdb_node_map *nodemap,
89                                         uint32_t pnn)
90 {
91         int i;
92
93         for (i=0; i<nodemap->num; i++) {
94                 if (nodemap->node[i].pnn == pnn) {
95                         return &nodemap->node[i];
96                 }
97         }
98         return NULL;
99 }
100
101 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
102 {
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         char *flags_str = NULL;
116         int i;
117
118         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
119                 if (flags & flag_names[i].flag) {
120                         if (flags_str == NULL) {
121                                 flags_str = talloc_asprintf(mem_ctx,
122                                                 "%s", flag_names[i].name);
123                         } else {
124                                 flags_str = talloc_asprintf_append(flags_str,
125                                                 "|%s", flag_names[i].name);
126                         }
127                         if (flags_str == NULL) {
128                                 return "OUT-OF-MEMORY";
129                         }
130                 }
131         }
132         if (flags_str == NULL) {
133                 return "OK";
134         }
135
136         return flags_str;
137 }
138
139 static uint64_t next_srvid(struct ctdb_context *ctdb)
140 {
141         ctdb->srvid += 1;
142         return ctdb->srvid;
143 }
144
145 /*
146  * Get consistent nodemap information.
147  *
148  * If nodemap is already cached, use that. If not get it.
149  * If the current node is BANNED, then get nodemap from "better" node.
150  */
151 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
152 {
153         TALLOC_CTX *tmp_ctx;
154         struct ctdb_node_map *nodemap;
155         struct ctdb_node_and_flags *node;
156         uint32_t current_node;
157         int ret;
158
159         if (force) {
160                 TALLOC_FREE(ctdb->nodemap);
161         }
162
163         if (ctdb->nodemap != NULL) {
164                 return ctdb->nodemap;
165         }
166
167         tmp_ctx = talloc_new(ctdb);
168         if (tmp_ctx == NULL) {
169                 return false;
170         }
171
172         current_node = ctdb->pnn;
173 again:
174         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
175                                     current_node, TIMEOUT(), &nodemap);
176         if (ret != 0) {
177                 fprintf(stderr, "Failed to get nodemap from node %u\n",
178                         current_node);
179                 goto failed;
180         }
181
182         node = get_node_by_pnn(nodemap, current_node);
183         if (node->flags & NODE_FLAGS_BANNED) {
184                 /* Pick next node */
185                 do {
186                         current_node = (current_node + 1) % nodemap->num;
187                         node = get_node_by_pnn(nodemap, current_node);
188                         if (! (node->flags &
189                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
190                                 break;
191                         }
192                 } while (current_node != ctdb->pnn);
193
194                 if (current_node == ctdb->pnn) {
195                         /* Tried all nodes in the cluster */
196                         fprintf(stderr, "Warning: All nodes are banned.\n");
197                         goto failed;
198                 }
199
200                 goto again;
201         }
202
203         ctdb->nodemap = talloc_steal(ctdb, nodemap);
204         return nodemap;
205
206 failed:
207         talloc_free(tmp_ctx);
208         return NULL;
209 }
210
211 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
212 {
213         struct ctdb_node_map *nodemap;
214         bool found;
215         int i;
216
217         if (pnn == -1) {
218                 return false;
219         }
220
221         nodemap = get_nodemap(ctdb, false);
222         if (nodemap == NULL) {
223                 return false;
224         }
225
226         found = false;
227         for (i=0; i<nodemap->num; i++) {
228                 if (nodemap->node[i].pnn == pnn) {
229                         found = true;
230                         break;
231                 }
232         }
233         if (! found) {
234                 fprintf(stderr, "Node %u does not exist\n", pnn);
235                 return false;
236         }
237
238         if (nodemap->node[i].flags &
239             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
240                 fprintf(stderr, "Node %u has status %s\n", pnn,
241                         pretty_print_flags(ctdb, nodemap->node[i].flags));
242                 return false;
243         }
244
245         return true;
246 }
247
248 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
249                                             struct ctdb_node_map *nodemap)
250 {
251         struct ctdb_node_map *nodemap2;
252
253         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
254         if (nodemap2 == NULL) {
255                 return NULL;
256         }
257
258         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
259                                       nodemap->num);
260         if (nodemap2->node == NULL) {
261                 talloc_free(nodemap2);
262                 return NULL;
263         }
264
265         return nodemap2;
266 }
267
268 /*
269  * Get the number and the list of matching nodes
270  *
271  *   nodestring :=  NULL | all | pnn,[pnn,...]
272  *
273  * If nodestring is NULL, use the current node.
274  */
275 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
276                              const char *nodestring,
277                              struct ctdb_node_map **out)
278 {
279         struct ctdb_node_map *nodemap, *nodemap2;
280         struct ctdb_node_and_flags *node;
281         int i;
282
283         nodemap = get_nodemap(ctdb, false);
284         if (nodemap == NULL) {
285                 return false;
286         }
287
288         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
289         if (nodemap2 == NULL) {
290                 return false;
291         }
292
293         if (nodestring == NULL) {
294                 for (i=0; i<nodemap->num; i++) {
295                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
296                                 nodemap2->node[0] = nodemap->node[i];
297                                 break;
298                         }
299                 }
300                 nodemap2->num = 1;
301
302                 goto done;
303         }
304
305         if (strcmp(nodestring, "all") == 0) {
306                 for (i=0; i<nodemap->num; i++) {
307                         nodemap2->node[i] = nodemap->node[i];
308                 }
309                 nodemap2->num = nodemap->num;
310
311                 goto done;
312         } else {
313                 char *ns, *tok;
314
315                 ns = talloc_strdup(mem_ctx, nodestring);
316                 if (ns == NULL) {
317                         return false;
318                 }
319
320                 tok = strtok(ns, ",");
321                 while (tok != NULL) {
322                         uint32_t pnn;
323                         char *endptr;
324
325                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
326                         if (pnn == 0 && tok == endptr) {
327                                 fprintf(stderr, "Invalid node %s\n", tok);
328                                         return false;
329                         }
330
331                         node = get_node_by_pnn(nodemap, pnn);
332                         if (node == NULL) {
333                                 fprintf(stderr, "Node %u does not exist\n",
334                                         pnn);
335                                 return false;
336                         }
337
338                         nodemap2->node[nodemap2->num] = *node;
339                         nodemap2->num += 1;
340
341                         tok = strtok(NULL, ",");
342                 }
343         }
344
345 done:
346         *out = nodemap2;
347         return true;
348 }
349
350 /* Compare IP address */
351 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
352 {
353         bool ret = false;
354
355         if (ip1->sa.sa_family != ip2->sa.sa_family) {
356                 return false;
357         }
358
359         switch (ip1->sa.sa_family) {
360         case AF_INET:
361                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
362                               sizeof(struct in_addr)) == 0);
363                 break;
364
365         case AF_INET6:
366                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
367                               sizeof(struct in6_addr)) == 0);
368                 break;
369         }
370
371         return ret;
372 }
373
374 /* Append a node to a node map with given address and flags */
375 static bool node_map_add(struct ctdb_node_map *nodemap,
376                          const char *nstr, uint32_t flags)
377 {
378         ctdb_sock_addr addr;
379         uint32_t num;
380         struct ctdb_node_and_flags *n;
381
382         if (! parse_ip(nstr, NULL, 0, &addr)) {
383                 fprintf(stderr, "Invalid IP address %s\n", nstr);
384                 return false;
385         }
386
387         num = nodemap->num;
388         nodemap->node = talloc_realloc(nodemap, nodemap->node,
389                                        struct ctdb_node_and_flags, num+1);
390         if (nodemap->node == NULL) {
391                 return false;
392         }
393
394         n = &nodemap->node[num];
395         n->addr = addr;
396         n->pnn = num;
397         n->flags = flags;
398
399         nodemap->num = num+1;
400         return true;
401 }
402
403 /* Read a nodes file into a node map */
404 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
405                                                   const char *nlist)
406 {
407         char **lines;
408         int nlines;
409         int i;
410         struct ctdb_node_map *nodemap;
411
412         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
413         if (nodemap == NULL) {
414                 return NULL;
415         }
416
417         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
418         if (lines == NULL) {
419                 return NULL;
420         }
421
422         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
423                 nlines--;
424         }
425
426         for (i=0; i<nlines; i++) {
427                 char *node;
428                 uint32_t flags;
429                 size_t len;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436
437                 len = strlen(node);
438
439                 /* strip trailing spaces */
440                 while ((len > 1) &&
441                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
442                 {
443                         node[len-1] = '\0';
444                         len--;
445                 }
446
447                 if (len == 0) {
448                         continue;
449                 }
450                 if (*node == '#') {
451                         /* A "deleted" node is a node that is
452                            commented out in the nodes file.  This is
453                            used instead of removing a line, which
454                            would cause subsequent nodes to change
455                            their PNN. */
456                         flags = NODE_FLAGS_DELETED;
457                         node = discard_const("0.0.0.0");
458                 } else {
459                         flags = 0;
460                 }
461                 if (! node_map_add(nodemap, node, flags)) {
462                         talloc_free(lines);
463                         TALLOC_FREE(nodemap);
464                         return NULL;
465                 }
466         }
467
468         talloc_free(lines);
469         return nodemap;
470 }
471
472 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
473 {
474         struct ctdb_node_map *nodemap;
475         char *nodepath;
476         const char *nodes_list = NULL;
477
478         if (pnn != CTDB_UNKNOWN_PNN) {
479                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
480                 if (nodepath != NULL) {
481                         nodes_list = getenv(nodepath);
482                 }
483         }
484         if (nodes_list == NULL) {
485                 nodes_list = getenv("CTDB_NODES");
486         }
487         if (nodes_list == NULL) {
488                 const char *basedir = getenv("CTDB_BASE");
489                 if (basedir == NULL) {
490                         basedir = CTDB_ETCDIR;
491                 }
492                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
493                 if (nodes_list == NULL) {
494                         fprintf(stderr, "Memory allocation error\n");
495                         return NULL;
496                 }
497         }
498
499         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
500         if (nodemap == NULL) {
501                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
502                         nodes_list);
503                 return NULL;
504         }
505
506         return nodemap;
507 }
508
509 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
510                                  struct ctdb_context *ctdb,
511                                  struct ctdb_dbid_map *dbmap,
512                                  const char *db_name)
513 {
514         struct ctdb_dbid *db = NULL;
515         const char *name;
516         int ret, i;
517
518         for (i=0; i<dbmap->num; i++) {
519                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
520                                            ctdb->pnn, TIMEOUT(),
521                                            dbmap->dbs[i].db_id, &name);
522                 if (ret != 0) {
523                         return false;
524                 }
525
526                 if (strcmp(db_name, name) == 0) {
527                         talloc_free(discard_const(name));
528                         db = &dbmap->dbs[i];
529                         break;
530                 }
531         }
532
533         return db;
534 }
535
536 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
537                       const char *db_arg, uint32_t *db_id,
538                       const char **db_name, uint8_t *db_flags)
539 {
540         struct ctdb_dbid_map *dbmap;
541         struct ctdb_dbid *db = NULL;
542         uint32_t id = 0;
543         const char *name = NULL;
544         int ret, i;
545
546         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
547                                   ctdb->pnn, TIMEOUT(), &dbmap);
548         if (ret != 0) {
549                 return false;
550         }
551
552         if (strncmp(db_arg, "0x", 2) == 0) {
553                 id = strtoul(db_arg, NULL, 0);
554                 for (i=0; i<dbmap->num; i++) {
555                         if (id == dbmap->dbs[i].db_id) {
556                                 db = &dbmap->dbs[i];
557                                 break;
558                         }
559                 }
560         } else {
561                 name = db_arg;
562                 db = db_find(mem_ctx, ctdb, dbmap, name);
563         }
564
565         if (db == NULL) {
566                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
567                 return false;
568         }
569
570         if (name == NULL) {
571                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
572                                            ctdb->pnn, TIMEOUT(), id, &name);
573                 if (ret != 0) {
574                         return false;
575                 }
576         }
577
578         if (db_id != NULL) {
579                 *db_id = db->db_id;
580         }
581         if (db_name != NULL) {
582                 *db_name = talloc_strdup(mem_ctx, name);
583         }
584         if (db_flags != NULL) {
585                 *db_flags = db->flags;
586         }
587         return true;
588 }
589
590 static int h2i(char h)
591 {
592         if (h >= 'a' && h <= 'f') {
593                 return h - 'a' + 10;
594         }
595         if (h >= 'A' && h <= 'F') {
596                 return h - 'f' + 10;
597         }
598         return h - '0';
599 }
600
601 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
602                        TDB_DATA *out)
603 {
604         int i;
605         TDB_DATA data;
606
607         if (len & 0x01) {
608                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
609                         str);
610                 return EINVAL;
611         }
612
613         data.dsize = len / 2;
614         data.dptr = talloc_size(mem_ctx, data.dsize);
615         if (data.dptr == NULL) {
616                 return ENOMEM;
617         }
618
619         for (i=0; i<data.dsize; i++) {
620                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
621         }
622
623         *out = data;
624         return 0;
625 }
626
627 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
628                        TDB_DATA *out)
629 {
630         TDB_DATA data;
631         int ret = 0;
632
633         if (strncmp(str, "0x", 2) == 0) {
634                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
635         } else {
636                 data.dptr = talloc_memdup(mem_ctx, str, len);
637                 if (data.dptr == NULL) {
638                         return ENOMEM;
639                 }
640                 data.dsize = len;
641         }
642
643         *out = data;
644         return ret;
645 }
646
647 static int run_helper(const char *command, const char *path, const char *arg1)
648 {
649         pid_t pid;
650         int save_errno, status, ret;
651
652         pid = fork();
653         if (pid < 0) {
654                 save_errno = errno;
655                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
656                         command, path, strerror(save_errno));
657                 return save_errno;
658         }
659
660         if (pid == 0) {
661                 ret = execl(path, path, arg1, NULL);
662                 if (ret == -1) {
663                         _exit(errno);
664                 }
665                 /* Should not happen */
666                 _exit(ENOEXEC);
667         }
668
669         ret = waitpid(pid, &status, 0);
670         if (ret == -1) {
671                 save_errno = errno;
672                 fprintf(stderr, "waitpid() failed for %s - %s\n",
673                         command, strerror(save_errno));
674                 return save_errno;
675         }
676
677         if (WIFEXITED(status)) {
678                 ret = WEXITSTATUS(status);
679                 return ret;
680         }
681
682         if (WIFSIGNALED(status)) {
683                 fprintf(stderr, "%s terminated with signal %d\n",
684                         command, WTERMSIG(status));
685                 return EINTR;
686         }
687
688         return 0;
689 }
690
691 /*
692  * Command Functions
693  */
694
695 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
696                            int argc, const char **argv)
697 {
698         printf("%s\n", CTDB_VERSION_STRING);
699         return 0;
700 }
701
702 static bool partially_online(TALLOC_CTX *mem_ctx,
703                              struct ctdb_context *ctdb,
704                              struct ctdb_node_and_flags *node)
705 {
706         struct ctdb_iface_list *iface_list;
707         int ret, i;
708         bool status = false;
709
710         if (node->flags != 0) {
711                 return false;
712         }
713
714         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
715                                    node->pnn, TIMEOUT(), &iface_list);
716         if (ret != 0) {
717                 return false;
718         }
719
720         status = false;
721         for (i=0; i < iface_list->num; i++) {
722                 if (iface_list->iface[i].link_state == 0) {
723                         status = true;
724                         break;
725                 }
726         }
727
728         return status;
729 }
730
731 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
732                                   struct ctdb_context *ctdb,
733                                   struct ctdb_node_map *nodemap,
734                                   uint32_t mypnn)
735 {
736         struct ctdb_node_and_flags *node;
737         int i;
738
739         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
740                options.sep,
741                "Node", options.sep,
742                "IP", options.sep,
743                "Disconnected", options.sep,
744                "Banned", options.sep,
745                "Disabled", options.sep,
746                "Unhealthy", options.sep,
747                "Stopped", options.sep,
748                "Inactive", options.sep,
749                "PartiallyOnline", options.sep,
750                "ThisNode", options.sep);
751
752         for (i=0; i<nodemap->num; i++) {
753                 node = &nodemap->node[i];
754                 if (node->flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757
758                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
759                        options.sep,
760                        node->pnn, options.sep,
761                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
762                        options.sep,
763                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
764                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
765                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
766                        options.sep,
767                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
768                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
769                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
770                        partially_online(mem_ctx, ctdb, node), options.sep,
771                        (node->pnn == mypnn)?'Y':'N', options.sep);
772         }
773
774 }
775
776 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
777                           struct ctdb_node_map *nodemap, uint32_t mypnn)
778 {
779         struct ctdb_node_and_flags *node;
780         int num_deleted_nodes = 0;
781         int i;
782
783         for (i=0; i<nodemap->num; i++) {
784                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
785                         num_deleted_nodes++;
786                 }
787         }
788
789         if (num_deleted_nodes == 0) {
790                 printf("Number of nodes:%d\n", nodemap->num);
791         } else {
792                 printf("Number of nodes:%d (including %d deleted nodes)\n",
793                        nodemap->num, num_deleted_nodes);
794         }
795
796         for (i=0; i<nodemap->num; i++) {
797                 node = &nodemap->node[i];
798                 if (node->flags & NODE_FLAGS_DELETED) {
799                         continue;
800                 }
801
802                 printf("pnn:%u %-16s %s%s\n",
803                        node->pnn,
804                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
805                        partially_online(mem_ctx, ctdb, node) ?
806                                 "PARTIALLYONLINE" :
807                                 pretty_print_flags(mem_ctx, node->flags),
808                        node->pnn == mypnn ? " (THIS NODE)" : "");
809         }
810 }
811
812 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
813                          struct ctdb_node_map *nodemap, uint32_t mypnn,
814                          struct ctdb_vnn_map *vnnmap, int recmode,
815                          uint32_t recmaster)
816 {
817         int i;
818
819         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
820
821         if (vnnmap->generation == INVALID_GENERATION) {
822                 printf("Generation:INVALID\n");
823         } else {
824                 printf("Generation:%u\n", vnnmap->generation);
825         }
826         printf("Size:%d\n", vnnmap->size);
827         for (i=0; i<vnnmap->size; i++) {
828                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
829         }
830
831         printf("Recovery mode:%s (%d)\n",
832                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
833                recmode);
834         printf("Recovery master:%d\n", recmaster);
835 }
836
837 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
838                           int argc, const char **argv)
839 {
840         struct ctdb_node_map *nodemap;
841         struct ctdb_vnn_map *vnnmap;
842         int recmode;
843         uint32_t recmaster;
844         int ret;
845
846         if (argc != 0) {
847                 usage("status");
848         }
849
850         nodemap = get_nodemap(ctdb, false);
851         if (nodemap == NULL) {
852                 return 1;
853         }
854
855         if (options.machinereadable == 1) {
856                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
857                 return 0;
858         }
859
860         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
861                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
862         if (ret != 0) {
863                 return ret;
864         }
865
866         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
867                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
868         if (ret != 0) {
869                 return ret;
870         }
871
872         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
873                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
874         if (ret != 0) {
875                 return ret;
876         }
877
878         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
879                      recmode, recmaster);
880         return 0;
881 }
882
883 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
884                           int argc, const char **argv)
885 {
886         struct ctdb_uptime *uptime;
887         int ret, tmp, days, hours, minutes, seconds;
888
889         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
890                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
891         if (ret != 0) {
892                 return ret;
893         }
894
895         printf("Current time of node %-4u     :                %s",
896                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
897
898         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
899         seconds = tmp % 60; tmp /= 60;
900         minutes = tmp % 60; tmp /= 60;
901         hours = tmp % 24; tmp /= 24;
902         days = tmp;
903
904         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
905                days, hours, minutes, seconds,
906                ctime(&uptime->ctdbd_start_time.tv_sec));
907
908         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
909         seconds = tmp % 60; tmp /= 60;
910         minutes = tmp % 60; tmp /= 60;
911         hours = tmp % 24; tmp /= 24;
912         days = tmp;
913
914         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
915                days, hours, minutes, seconds,
916                ctime(&uptime->last_recovery_finished.tv_sec));
917
918         printf("Duration of last recovery/failover: %lf seconds\n",
919                timeval_delta(&uptime->last_recovery_finished,
920                              &uptime->last_recovery_started));
921
922         return 0;
923 }
924
925 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
926                         int argc, const char **argv)
927 {
928         struct timeval tv;
929         int ret, num_clients;
930
931         tv = timeval_current();
932         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
933                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
934         if (ret != 0) {
935                 return ret;
936         }
937
938         printf("response from %u time=%.6f sec  (%d clients)\n",
939                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
940         return 0;
941 }
942
943 const char *runstate_to_string(enum ctdb_runstate runstate);
944 enum ctdb_runstate runstate_from_string(const char *runstate_str);
945
946 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
947                             int argc, const char **argv)
948 {
949         enum ctdb_runstate runstate;
950         bool found;
951         int ret, i;
952
953         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
954                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
955         if (ret != 0) {
956                 return ret;
957         }
958
959         found = true;
960         for (i=0; i<argc; i++) {
961                 enum ctdb_runstate t;
962
963                 found = false;
964                 t = ctdb_runstate_from_string(argv[i]);
965                 if (t == CTDB_RUNSTATE_UNKNOWN) {
966                         printf("Invalid run state (%s)\n", argv[i]);
967                         return 1;
968                 }
969
970                 if (t == runstate) {
971                         found = true;
972                         break;
973                 }
974         }
975
976         if (! found) {
977                 printf("CTDB not in required run state (got %s)\n",
978                        ctdb_runstate_to_string(runstate));
979                 return 1;
980         }
981
982         printf("%s\n", ctdb_runstate_to_string(runstate));
983         return 0;
984 }
985
986 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
987                           int argc, const char **argv)
988 {
989         struct ctdb_var_list *tun_var_list;
990         uint32_t value;
991         int ret, i;
992         bool found;
993
994         if (argc != 1) {
995                 usage("getvar");
996         }
997
998         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
999                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1000         if (ret != 0) {
1001                 fprintf(stderr,
1002                         "Failed to get list of variables from node %u\n",
1003                         ctdb->cmd_pnn);
1004                 return ret;
1005         }
1006
1007         found = false;
1008         for (i=0; i<tun_var_list->count; i++) {
1009                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1010                         found = true;
1011                         break;
1012                 }
1013         }
1014
1015         if (! found) {
1016                 printf("No such tunable %s\n", argv[0]);
1017                 return 1;
1018         }
1019
1020         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1021                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1022         if (ret != 0) {
1023                 return ret;
1024         }
1025
1026         printf("%-26s = %u\n", argv[0], value);
1027         return 0;
1028 }
1029
1030 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1031                           int argc, const char **argv)
1032 {
1033         struct ctdb_var_list *tun_var_list;
1034         struct ctdb_tunable tunable;
1035         int ret, i;
1036         bool found;
1037
1038         if (argc != 2) {
1039                 usage("setvar");
1040         }
1041
1042         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1043                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1044         if (ret != 0) {
1045                 fprintf(stderr,
1046                         "Failed to get list of variables from node %u\n",
1047                         ctdb->cmd_pnn);
1048                 return ret;
1049         }
1050
1051         found = false;
1052         for (i=0; i<tun_var_list->count; i++) {
1053                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1054                         found = true;
1055                         break;
1056                 }
1057         }
1058
1059         if (! found) {
1060                 printf("No such tunable %s\n", argv[0]);
1061                 return 1;
1062         }
1063
1064         tunable.name = argv[0];
1065         tunable.value = strtoul(argv[1], NULL, 0);
1066
1067         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1068                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1069         if (ret != 0) {
1070                 if (ret == 1) {
1071                         fprintf(stderr,
1072                                 "Setting obsolete tunable variable '%s'\n",
1073                                tunable.name);
1074                         return 0;
1075                 }
1076         }
1077
1078         return ret;
1079 }
1080
1081 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1082                             int argc, const char **argv)
1083 {
1084         struct ctdb_var_list *tun_var_list;
1085         int ret, i;
1086
1087         if (argc != 0) {
1088                 usage("listvars");
1089         }
1090
1091         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1092                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1093         if (ret != 0) {
1094                 return ret;
1095         }
1096
1097         for (i=0; i<tun_var_list->count; i++) {
1098                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1099         }
1100
1101         return 0;
1102 }
1103
1104 const struct {
1105         const char *name;
1106         uint32_t offset;
1107 } stats_fields[] = {
1108 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1109         STATISTICS_FIELD(num_clients),
1110         STATISTICS_FIELD(frozen),
1111         STATISTICS_FIELD(recovering),
1112         STATISTICS_FIELD(num_recoveries),
1113         STATISTICS_FIELD(client_packets_sent),
1114         STATISTICS_FIELD(client_packets_recv),
1115         STATISTICS_FIELD(node_packets_sent),
1116         STATISTICS_FIELD(node_packets_recv),
1117         STATISTICS_FIELD(keepalive_packets_sent),
1118         STATISTICS_FIELD(keepalive_packets_recv),
1119         STATISTICS_FIELD(node.req_call),
1120         STATISTICS_FIELD(node.reply_call),
1121         STATISTICS_FIELD(node.req_dmaster),
1122         STATISTICS_FIELD(node.reply_dmaster),
1123         STATISTICS_FIELD(node.reply_error),
1124         STATISTICS_FIELD(node.req_message),
1125         STATISTICS_FIELD(node.req_control),
1126         STATISTICS_FIELD(node.reply_control),
1127         STATISTICS_FIELD(client.req_call),
1128         STATISTICS_FIELD(client.req_message),
1129         STATISTICS_FIELD(client.req_control),
1130         STATISTICS_FIELD(timeouts.call),
1131         STATISTICS_FIELD(timeouts.control),
1132         STATISTICS_FIELD(timeouts.traverse),
1133         STATISTICS_FIELD(locks.num_calls),
1134         STATISTICS_FIELD(locks.num_current),
1135         STATISTICS_FIELD(locks.num_pending),
1136         STATISTICS_FIELD(locks.num_failed),
1137         STATISTICS_FIELD(total_calls),
1138         STATISTICS_FIELD(pending_calls),
1139         STATISTICS_FIELD(childwrite_calls),
1140         STATISTICS_FIELD(pending_childwrite_calls),
1141         STATISTICS_FIELD(memory_used),
1142         STATISTICS_FIELD(max_hop_count),
1143         STATISTICS_FIELD(total_ro_delegations),
1144         STATISTICS_FIELD(total_ro_revokes),
1145 };
1146
1147 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1148
1149 static void print_statistics_machine(struct ctdb_statistics *s,
1150                                      bool show_header)
1151 {
1152         int i;
1153
1154         if (show_header) {
1155                 printf("CTDB version%s", options.sep);
1156                 printf("Current time of statistics%s", options.sep);
1157                 printf("Statistics collected since%s", options.sep);
1158                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1159                         printf("%s%s", stats_fields[i].name, options.sep);
1160                 }
1161                 printf("num_reclock_ctdbd_latency%s", options.sep);
1162                 printf("min_reclock_ctdbd_latency%s", options.sep);
1163                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1164                 printf("max_reclock_ctdbd_latency%s", options.sep);
1165
1166                 printf("num_reclock_recd_latency%s", options.sep);
1167                 printf("min_reclock_recd_latency%s", options.sep);
1168                 printf("avg_reclock_recd_latency%s", options.sep);
1169                 printf("max_reclock_recd_latency%s", options.sep);
1170
1171                 printf("num_call_latency%s", options.sep);
1172                 printf("min_call_latency%s", options.sep);
1173                 printf("avg_call_latency%s", options.sep);
1174                 printf("max_call_latency%s", options.sep);
1175
1176                 printf("num_lockwait_latency%s", options.sep);
1177                 printf("min_lockwait_latency%s", options.sep);
1178                 printf("avg_lockwait_latency%s", options.sep);
1179                 printf("max_lockwait_latency%s", options.sep);
1180
1181                 printf("num_childwrite_latency%s", options.sep);
1182                 printf("min_childwrite_latency%s", options.sep);
1183                 printf("avg_childwrite_latency%s", options.sep);
1184                 printf("max_childwrite_latency%s", options.sep);
1185                 printf("\n");
1186         }
1187
1188         printf("%u%s", CTDB_PROTOCOL, options.sep);
1189         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1191         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1192                 printf("%u%s",
1193                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1194                        options.sep);
1195         }
1196         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1197         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1198         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1199         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1200
1201         printf("%u%s", s->reclock.recd.num, options.sep);
1202         printf("%.6f%s", s->reclock.recd.min, options.sep);
1203         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1204         printf("%.6f%s", s->reclock.recd.max, options.sep);
1205
1206         printf("%d%s", s->call_latency.num, options.sep);
1207         printf("%.6f%s", s->call_latency.min, options.sep);
1208         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1209         printf("%.6f%s", s->call_latency.max, options.sep);
1210
1211         printf("%d%s", s->childwrite_latency.num, options.sep);
1212         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1213         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1214         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1215         printf("\n");
1216 }
1217
1218 static void print_statistics(struct ctdb_statistics *s)
1219 {
1220         int tmp, days, hours, minutes, seconds;
1221         int i;
1222         const char *prefix = NULL;
1223         int preflen = 0;
1224
1225         tmp = s->statistics_current_time.tv_sec -
1226               s->statistics_start_time.tv_sec;
1227         seconds = tmp % 60; tmp /= 60;
1228         minutes = tmp % 60; tmp /= 60;
1229         hours   = tmp % 24; tmp /= 24;
1230         days    = tmp;
1231
1232         printf("CTDB version %u\n", CTDB_PROTOCOL);
1233         printf("Current time of statistics  :                %s",
1234                ctime(&s->statistics_current_time.tv_sec));
1235         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1236                days, hours, minutes, seconds,
1237                ctime(&s->statistics_start_time.tv_sec));
1238
1239         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1240                 if (strchr(stats_fields[i].name, '.') != NULL) {
1241                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1242                         if (! prefix ||
1243                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1244                                 prefix = stats_fields[i].name;
1245                                 printf(" %*.*s\n", preflen-1, preflen-1,
1246                                        stats_fields[i].name);
1247                         }
1248                 } else {
1249                         preflen = 0;
1250                 }
1251                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1252                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1253                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1254         }
1255
1256         printf(" hop_count_buckets:");
1257         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1258                 printf(" %d", s->hop_count_bucket[i]);
1259         }
1260         printf("\n");
1261         printf(" lock_buckets:");
1262         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1263                 printf(" %d", s->locks.buckets[i]);
1264         }
1265         printf("\n");
1266         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1267                "locks_latency      MIN/AVG/MAX",
1268                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1269                s->locks.latency.max, s->locks.latency.num);
1270
1271         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1272                "reclock_ctdbd      MIN/AVG/MAX",
1273                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1274                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1275
1276         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1277                "reclock_recd       MIN/AVG/MAX",
1278                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1279                s->reclock.recd.max, s->reclock.recd.num);
1280
1281         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1282                "call_latency       MIN/AVG/MAX",
1283                s->call_latency.min, LATENCY_AVG(s->call_latency),
1284                s->call_latency.max, s->call_latency.num);
1285
1286         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1287                "childwrite_latency MIN/AVG/MAX",
1288                s->childwrite_latency.min,
1289                LATENCY_AVG(s->childwrite_latency),
1290                s->childwrite_latency.max, s->childwrite_latency.num);
1291 }
1292
1293 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1294                               int argc, const char **argv)
1295 {
1296         struct ctdb_statistics *stats;
1297         int ret;
1298
1299         if (argc != 0) {
1300                 usage("statistics");
1301         }
1302
1303         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1304                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1305         if (ret != 0) {
1306                 return ret;
1307         }
1308
1309         if (options.machinereadable) {
1310                 print_statistics_machine(stats, true);
1311         } else {
1312                 print_statistics(stats);
1313         }
1314
1315         return 0;
1316 }
1317
1318 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1319                                     struct ctdb_context *ctdb,
1320                                     int argc, const char **argv)
1321 {
1322         int ret;
1323
1324         if (argc != 0) {
1325                 usage("statisticsreset");
1326         }
1327
1328         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1329                                          ctdb->cmd_pnn, TIMEOUT());
1330         if (ret != 0) {
1331                 return ret;
1332         }
1333
1334         return 0;
1335 }
1336
1337 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1338                          int argc, const char **argv)
1339 {
1340         struct ctdb_statistics_list *slist;
1341         int ret, count = 0, i;
1342         bool show_header = true;
1343
1344         if (argc > 1) {
1345                 usage("stats");
1346         }
1347
1348         if (argc == 1) {
1349                 count = atoi(argv[0]);
1350         }
1351
1352         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1353                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1354         if (ret != 0) {
1355                 return ret;
1356         }
1357
1358         for (i=0; i<slist->num; i++) {
1359                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1360                         continue;
1361                 }
1362                 if (options.machinereadable == 1) {
1363                         print_statistics_machine(&slist->stats[i],
1364                                                  show_header);
1365                         show_header = false;
1366                 } else {
1367                         print_statistics(&slist->stats[i]);
1368                 }
1369                 if (count > 0 && i == count) {
1370                         break;
1371                 }
1372         }
1373
1374         return 0;
1375 }
1376
1377 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1378                      struct ctdb_public_ip_list *ips,
1379                      struct ctdb_public_ip_info **ipinfo,
1380                      bool all_nodes)
1381 {
1382         int i, j;
1383         char *conf, *avail, *active;
1384
1385         if (options.machinereadable == 1) {
1386                 printf("%s%s%s%s%s", options.sep,
1387                        "Public IP", options.sep,
1388                        "Node", options.sep);
1389                 if (options.verbose == 1) {
1390                         printf("%s%s%s%s%s%s\n",
1391                                "ActiveInterfaces", options.sep,
1392                                "AvailableInterfaces", options.sep,
1393                                "ConfiguredInterfaces", options.sep);
1394                 } else {
1395                         printf("\n");
1396                 }
1397         } else {
1398                 if (all_nodes) {
1399                         printf("Public IPs on ALL nodes\n");
1400                 } else {
1401                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1402                 }
1403         }
1404
1405         /* IPs are reverse sorted */
1406         for (i=ips->num-1; i>=0; i--) {
1407
1408                 if (options.machinereadable == 1) {
1409                         printf("%s%s%s%d%s", options.sep,
1410                                ctdb_sock_addr_to_string(
1411                                         mem_ctx, &ips->ip[i].addr),
1412                                options.sep,
1413                                (int)ips->ip[i].pnn, options.sep);
1414                 } else {
1415                         printf("%s", ctdb_sock_addr_to_string(
1416                                                 mem_ctx, &ips->ip[i].addr));
1417                 }
1418
1419                 if (options.verbose == 0) {
1420                         if (options.machinereadable == 1) {
1421                                 printf("\n");
1422                         } else {
1423                                 printf(" %d\n", (int)ips->ip[i].pnn);
1424                         }
1425                         continue;
1426                 }
1427
1428                 conf = NULL;
1429                 avail = NULL;
1430                 active = NULL;
1431
1432                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1433                         struct ctdb_iface *iface;
1434
1435                         iface = &ipinfo[i]->ifaces->iface[j];
1436                         if (conf == NULL) {
1437                                 conf = talloc_strdup(mem_ctx, iface->name);
1438                         } else {
1439                                 conf = talloc_asprintf_append(
1440                                                 mem_ctx, ",%s", iface->name);
1441                         }
1442
1443                         if (ipinfo[i]->active_idx == j) {
1444                                 active = iface->name;
1445                         }
1446
1447                         if (iface->link_state == 0) {
1448                                 continue;
1449                         }
1450
1451                         if (avail == NULL) {
1452                                 avail = talloc_strdup(mem_ctx, iface->name);
1453                         } else {
1454                                 avail = talloc_asprintf_append(
1455                                                 mem_ctx, ",%s", iface->name);
1456                         }
1457                 }
1458
1459                 if (options.machinereadable == 1) {
1460                         printf("%s%s%s%s%s%s\n",
1461                                active ? active : "", options.sep,
1462                                avail ? avail : "", options.sep,
1463                                conf ? conf : "", options.sep);
1464                 } else {
1465                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1466                                ips->ip[i].pnn, active ? active : "",
1467                                avail ? avail : "", conf ? conf : "");
1468                 }
1469         }
1470 }
1471
1472 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1473                        size_t datalen, void *private_data)
1474 {
1475         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1476                 private_data, struct ctdb_public_ip_list);
1477         struct ctdb_public_ip *ip;
1478
1479         ip = (struct ctdb_public_ip *)databuf;
1480         ips->ip[ips->num] = *ip;
1481         ips->num += 1;
1482
1483         return 0;
1484 }
1485
1486 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1487                               struct ctdb_public_ip_list **out)
1488 {
1489         struct ctdb_node_map *nodemap;
1490         struct ctdb_public_ip_list *ips;
1491         struct db_hash_context *ipdb;
1492         uint32_t *pnn_list;
1493         int ret, count, i, j;
1494
1495         nodemap = get_nodemap(ctdb, false);
1496         if (nodemap == NULL) {
1497                 return 1;
1498         }
1499
1500         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1501         if (ret != 0) {
1502                 goto failed;
1503         }
1504
1505         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1506                                      &pnn_list);
1507         if (count <= 0) {
1508                 goto failed;
1509         }
1510
1511         for (i=0; i<count; i++) {
1512                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1513                                                pnn_list[i], TIMEOUT(), &ips);
1514                 if (ret != 0) {
1515                         goto failed;
1516                 }
1517
1518                 for (j=0; j<ips->num; j++) {
1519                         struct ctdb_public_ip ip;
1520
1521                         ip.pnn = ips->ip[j].pnn;
1522                         ip.addr = ips->ip[j].addr;
1523
1524                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1525                                           sizeof(ip.addr),
1526                                           (uint8_t *)&ip, sizeof(ip));
1527                         if (ret != 0) {
1528                                 goto failed;
1529                         }
1530                 }
1531
1532                 TALLOC_FREE(ips);
1533         }
1534
1535         talloc_free(pnn_list);
1536
1537         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1538         if (ret != 0) {
1539                 goto failed;
1540         }
1541
1542         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1543         if (ips == NULL) {
1544                 goto failed;
1545         }
1546
1547         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1548         if (ips->ip == NULL) {
1549                 goto failed;
1550         }
1551
1552         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         if (count != ips->num) {
1558                 goto failed;
1559         }
1560
1561         talloc_free(ipdb);
1562
1563         *out = ips;
1564         return 0;
1565
1566 failed:
1567         talloc_free(ipdb);
1568         return 1;
1569 }
1570
1571 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1572                       int argc, const char **argv)
1573 {
1574         struct ctdb_public_ip_list *ips;
1575         struct ctdb_public_ip_info **ipinfo;
1576         int ret, i;
1577         bool do_all = false;
1578
1579         if (argc > 1) {
1580                 usage("ip");
1581         }
1582
1583         if (argc == 1) {
1584                 if (strcmp(argv[0], "all") == 0) {
1585                         do_all = true;
1586                 } else {
1587                         usage("ip");
1588                 }
1589         }
1590
1591         if (do_all) {
1592                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1593         } else {
1594                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1595                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1596         }
1597         if (ret != 0) {
1598                 return ret;
1599         }
1600
1601         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1602         if (ipinfo == NULL) {
1603                 return 1;
1604         }
1605
1606         for (i=0; i<ips->num; i++) {
1607                 uint32_t pnn;
1608                 if (do_all) {
1609                         pnn = ips->ip[i].pnn;
1610                 } else {
1611                         pnn = ctdb->cmd_pnn;
1612                 }
1613                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1614                                                    ctdb->client, pnn,
1615                                                    TIMEOUT(), &ips->ip[i].addr,
1616                                                    &ipinfo[i]);
1617                 if (ret != 0) {
1618                         return ret;
1619                 }
1620         }
1621
1622         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1623         return 0;
1624 }
1625
1626 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1627                           int argc, const char **argv)
1628 {
1629         struct ctdb_public_ip_info *ipinfo;
1630         ctdb_sock_addr addr;
1631         int ret, i;
1632
1633         if (argc != 1) {
1634                 usage("ipinfo");
1635         }
1636
1637         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1638                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1639                 return 1;
1640         }
1641
1642         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1643                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1644                                            &ipinfo);
1645         if (ret != 0) {
1646                 if (ret == -1) {
1647                         printf("Node %u does not know about IP %s\n",
1648                                ctdb->cmd_pnn, argv[0]);
1649                 }
1650                 return ret;
1651         }
1652
1653         printf("Public IP[%s] info on node %u\n",
1654                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1655                                         ctdb->cmd_pnn);
1656
1657         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1658                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1659                ipinfo->ip.pnn, ipinfo->ifaces->num);
1660
1661         for (i=0; i<ipinfo->ifaces->num; i++) {
1662                 struct ctdb_iface *iface;
1663
1664                 iface = &ipinfo->ifaces->iface[i];
1665                 iface->name[CTDB_IFACE_SIZE] = '\0';
1666                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1667                        i+1, iface->name,
1668                        iface->link_state == 0 ? "down" : "up",
1669                        iface->references,
1670                        (i == ipinfo->active_idx) ? " (active)" : "");
1671         }
1672
1673         return 0;
1674 }
1675
1676 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1677                           int argc, const char **argv)
1678 {
1679         struct ctdb_iface_list *ifaces;
1680         int ret, i;
1681
1682         if (argc != 0) {
1683                 usage("ifaces");
1684         }
1685
1686         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1687                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1688         if (ret != 0) {
1689                 return ret;
1690         }
1691
1692         if (ifaces->num == 0) {
1693                 printf("No interfaces configured on node %u\n",
1694                        ctdb->cmd_pnn);
1695                 return 0;
1696         }
1697
1698         if (options.machinereadable) {
1699                 printf("%s%s%s%s%s%s%s\n", options.sep,
1700                        "Name", options.sep,
1701                        "LinkStatus", options.sep,
1702                        "References", options.sep);
1703         } else {
1704                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1705         }
1706
1707         for (i=0; i<ifaces->num; i++) {
1708                 if (options.machinereadable) {
1709                         printf("%s%s%s%u%s%u%s\n", options.sep,
1710                                ifaces->iface[i].name, options.sep,
1711                                ifaces->iface[i].link_state, options.sep,
1712                                ifaces->iface[i].references, options.sep);
1713                 } else {
1714                         printf("name:%s link:%s references:%u\n",
1715                                ifaces->iface[i].name,
1716                                ifaces->iface[i].link_state ? "up" : "down",
1717                                ifaces->iface[i].references);
1718                 }
1719         }
1720
1721         return 0;
1722 }
1723
1724 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1725                                 int argc, const char **argv)
1726 {
1727         struct ctdb_iface_list *ifaces;
1728         struct ctdb_iface *iface;
1729         int ret, i;
1730
1731         if (argc != 2) {
1732                 usage("setifacelink");
1733         }
1734
1735         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1736                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1737                 return 1;
1738         }
1739
1740         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1741                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1742         if (ret != 0) {
1743                 fprintf(stderr,
1744                         "Failed to get interface information from node %u\n",
1745                         ctdb->cmd_pnn);
1746                 return ret;
1747         }
1748
1749         iface = NULL;
1750         for (i=0; i<ifaces->num; i++) {
1751                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1752                         iface = &ifaces->iface[i];
1753                         break;
1754                 }
1755         }
1756
1757         if (iface == NULL) {
1758                 printf("Interface %s not configured on node %u\n",
1759                        argv[0], ctdb->cmd_pnn);
1760                 return 1;
1761         }
1762
1763         if (strcmp(argv[1], "up") == 0) {
1764                 iface->link_state = 1;
1765         } else if (strcmp(argv[1], "down") == 0) {
1766                 iface->link_state = 0;
1767         } else {
1768                 usage("setifacelink");
1769                 return 1;
1770         }
1771
1772         iface->references = 0;
1773
1774         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1775                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1776         if (ret != 0) {
1777                 return ret;
1778         }
1779
1780         return 0;
1781 }
1782
1783 static int control_process_exists(TALLOC_CTX *mem_ctx,
1784                                   struct ctdb_context *ctdb,
1785                                   int argc, const char **argv)
1786 {
1787         pid_t pid;
1788         int ret, status;
1789
1790         if (argc != 1) {
1791                 usage("process-exists");
1792         }
1793
1794         pid = atoi(argv[0]);
1795         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1796                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1797         if (ret != 0) {
1798                 return ret;
1799         }
1800
1801         if (status == 0) {
1802                 printf("PID %u exists\n", pid);
1803         } else {
1804                 printf("PID %u does not exist\n", pid);
1805         }
1806         return status;
1807 }
1808
1809 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                             int argc, const char **argv)
1811 {
1812         struct ctdb_dbid_map *dbmap;
1813         int ret, i;
1814
1815         if (argc != 0) {
1816                 usage("getdbmap");
1817         }
1818
1819         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1820                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1821         if (ret != 0) {
1822                 return ret;
1823         }
1824
1825         if (options.machinereadable == 1) {
1826                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1827                        options.sep,
1828                        "ID", options.sep,
1829                        "Name", options.sep,
1830                        "Path", options.sep,
1831                        "Persistent", options.sep,
1832                        "Sticky", options.sep,
1833                        "Unhealthy", options.sep,
1834                        "Readonly", options.sep);
1835         } else {
1836                 printf("Number of databases:%d\n", dbmap->num);
1837         }
1838
1839         for (i=0; i<dbmap->num; i++) {
1840                 const char *name;
1841                 const char *path;
1842                 const char *health;
1843                 bool persistent;
1844                 bool readonly;
1845                 bool sticky;
1846                 uint32_t db_id;
1847
1848                 db_id = dbmap->dbs[i].db_id;
1849
1850                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1851                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1852                                            &name);
1853                 if (ret != 0) {
1854                         return ret;
1855                 }
1856
1857                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1858                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1859                                           &path);
1860                 if (ret != 0) {
1861                         return ret;
1862                 }
1863
1864                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1865                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1866                                               &health);
1867                 if (ret != 0) {
1868                         return ret;
1869                 }
1870
1871                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1872                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1873                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1874
1875                 if (options.machinereadable == 1) {
1876                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1877                                options.sep,
1878                                db_id, options.sep,
1879                                name, options.sep,
1880                                path, options.sep,
1881                                !! (persistent), options.sep,
1882                                !! (sticky), options.sep,
1883                                !! (health), options.sep,
1884                                !! (readonly), options.sep);
1885                 } else {
1886                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1887                                db_id, name, path,
1888                                persistent ? " PERSISTENT" : "",
1889                                sticky ? " STICKY" : "",
1890                                readonly ? " READONLY" : "",
1891                                health ? " UNHEALTHY" : "");
1892                 }
1893
1894                 talloc_free(discard_const(name));
1895                 talloc_free(discard_const(path));
1896                 talloc_free(discard_const(health));
1897         }
1898
1899         return 0;
1900 }
1901
1902 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1903                                int argc, const char **argv)
1904 {
1905         uint32_t db_id;
1906         const char *db_name, *db_path, *db_health;
1907         uint8_t db_flags;
1908         int ret;
1909
1910         if (argc != 1) {
1911                 usage("getdbstatus");
1912         }
1913
1914         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1915                 return 1;
1916         }
1917
1918         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1919                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1920                                   &db_path);
1921         if (ret != 0) {
1922                 return ret;
1923         }
1924
1925         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1926                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1927                                       &db_health);
1928         if (ret != 0) {
1929                 return ret;
1930         }
1931
1932         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1933         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1934                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1935                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1936                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1937                (db_health ? db_health : "OK"));
1938         return 0;
1939 }
1940
1941 struct dump_record_state {
1942         uint32_t count;
1943 };
1944
1945 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1946
1947 static void dump_tdb_data(const char *name, TDB_DATA val)
1948 {
1949         int i;
1950
1951         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1952         for (i=0; i<val.dsize; i++) {
1953                 if (ISASCII(val.dptr[i])) {
1954                         fprintf(stdout, "%c", val.dptr[i]);
1955                 } else {
1956                         fprintf(stdout, "\\%02X", val.dptr[i]);
1957                 }
1958         }
1959         fprintf(stdout, "\"\n");
1960 }
1961
1962 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1963 {
1964         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1965         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1966         fprintf(stdout, "flags: 0x%08x", header->flags);
1967         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1968                 fprintf(stdout, " MIGRATED_WITH_DATA");
1969         }
1970         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1971                 fprintf(stdout, " VACUUM_MIGRATED");
1972         }
1973         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1974                 fprintf(stdout, " AUTOMATIC");
1975         }
1976         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1977                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1978         }
1979         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
1980                 fprintf(stdout, " RO_HAVE_READONLY");
1981         }
1982         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
1983                 fprintf(stdout, " RO_REVOKING_READONLY");
1984         }
1985         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
1986                 fprintf(stdout, " RO_REVOKE_COMPLETE");
1987         }
1988         fprintf(stdout, "\n");
1989
1990 }
1991
1992 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
1993                        TDB_DATA key, TDB_DATA data, void *private_data)
1994 {
1995         struct dump_record_state *state =
1996                 (struct dump_record_state *)private_data;
1997
1998         state->count += 1;
1999
2000         dump_tdb_data("key", key);
2001         dump_ltdb_header(header);
2002         dump_tdb_data("data", data);
2003         fprintf(stdout, "\n");
2004
2005         return 0;
2006 }
2007
2008 struct traverse_state {
2009         TALLOC_CTX *mem_ctx;
2010         bool done;
2011         ctdb_rec_parser_func_t func;
2012         struct dump_record_state sub_state;
2013 };
2014
2015 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2016 {
2017         struct traverse_state *state = (struct traverse_state *)private_data;
2018         struct ctdb_rec_data *rec;
2019         struct ctdb_ltdb_header header;
2020         int ret;
2021
2022         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2023         if (ret != 0) {
2024                 return;
2025         }
2026
2027         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2028                 talloc_free(rec);
2029                 /* end of traverse */
2030                 state->done = true;
2031                 return;
2032         }
2033
2034         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2035         if (ret != 0) {
2036                 talloc_free(rec);
2037                 return;
2038         }
2039
2040         if (rec->data.dsize == 0) {
2041                 talloc_free(rec);
2042                 return;
2043         }
2044
2045         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2046                           &state->sub_state);
2047         talloc_free(rec);
2048         if (ret != 0) {
2049                 state->done = true;
2050         }
2051 }
2052
2053 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2054                          int argc, const char **argv)
2055 {
2056         struct ctdb_db_context *db;
2057         const char *db_name;
2058         uint32_t db_id;
2059         uint8_t db_flags;
2060         struct ctdb_traverse_start_ext traverse;
2061         struct traverse_state state;
2062         int ret;
2063
2064         if (argc != 1) {
2065                 usage("catdb");
2066         }
2067
2068         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2069                 return 1;
2070         }
2071
2072         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2073                           db_flags, &db);
2074         if (ret != 0) {
2075                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2076                 return ret;
2077         }
2078
2079         /* Valgrind fix */
2080         ZERO_STRUCT(traverse);
2081
2082         traverse.db_id = db_id;
2083         traverse.reqid = 0;
2084         traverse.srvid = next_srvid(ctdb);
2085         traverse.withemptyrecords = false;
2086
2087         state.mem_ctx = mem_ctx;
2088         state.done = false;
2089         state.func = dump_record;
2090         state.sub_state.count = 0;
2091
2092         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2093                                               traverse.srvid,
2094                                               traverse_handler, &state);
2095         if (ret != 0) {
2096                 return ret;
2097         }
2098
2099         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2100                                            ctdb->cmd_pnn, TIMEOUT(),
2101                                            &traverse);
2102         if (ret != 0) {
2103                 return ret;
2104         }
2105
2106         ctdb_client_wait(ctdb->ev, &state.done);
2107
2108         printf("Dumped %u records\n", state.sub_state.count);
2109
2110         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2111                                                  traverse.srvid, &state);
2112         if (ret != 0) {
2113                 return ret;
2114         }
2115
2116         return 0;
2117 }
2118
2119 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2120                           int argc, const char **argv)
2121 {
2122         struct ctdb_db_context *db;
2123         const char *db_name;
2124         uint32_t db_id;
2125         uint8_t db_flags;
2126         struct dump_record_state state;
2127         int ret;
2128
2129         if (argc != 1) {
2130                 usage("catdb");
2131         }
2132
2133         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2134                 return 1;
2135         }
2136
2137         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2138                           db_flags, &db);
2139         if (ret != 0) {
2140                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2141                 return ret;
2142         }
2143
2144         state.count = 0;
2145         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2146
2147         printf("Dumped %u record(s)\n", state.count);
2148
2149         return ret;
2150 }
2151
2152 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2153                               int argc, const char **argv)
2154 {
2155         int mode, ret;
2156
2157         if (argc != 0) {
2158                 usage("getmonmode");
2159         }
2160
2161         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2162                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2163         if (ret != 0) {
2164                 return ret;
2165         }
2166
2167         printf("%s\n",
2168                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2169         return 0;
2170 }
2171
2172 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2173                                    struct ctdb_context *ctdb,
2174                                    int argc, const char **argv)
2175 {
2176         uint32_t caps;
2177         int ret;
2178
2179         if (argc != 0) {
2180                 usage("getcapabilities");
2181         }
2182
2183         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2184                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2185         if (ret != 0) {
2186                 return ret;
2187         }
2188
2189         if (options.machinereadable == 1) {
2190                 printf("%s%s%s%s%s\n",
2191                        options.sep,
2192                        "RECMASTER", options.sep,
2193                        "LMASTER", options.sep);
2194                 printf("%s%d%s%d%s\n", options.sep,
2195                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2196                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2197         } else {
2198                 printf("RECMASTER: %s\n",
2199                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2200                 printf("LMASTER: %s\n",
2201                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2202         }
2203
2204         return 0;
2205 }
2206
2207 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2208                        int argc, const char **argv)
2209 {
2210         printf("%u\n", ctdb_client_pnn(ctdb->client));
2211         return 0;
2212 }
2213
2214 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2215                        int argc, const char **argv)
2216 {
2217         char *t, *lvs_helper = NULL;
2218
2219         if (argc != 1) {
2220                 usage("lvs");
2221         }
2222
2223         t = getenv("CTDB_LVS_HELPER");
2224         if (t != NULL) {
2225                 lvs_helper = talloc_strdup(mem_ctx, t);
2226         } else {
2227                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2228                                              CTDB_HELPER_BINDIR);
2229         }
2230
2231         if (lvs_helper == NULL) {
2232                 fprintf(stderr, "Unable to set LVS helper\n");
2233                 return 1;
2234         }
2235
2236         return run_helper("LVS helper", lvs_helper, argv[0]);
2237 }
2238
2239 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2240                                    struct ctdb_context *ctdb,
2241                                    int argc, const char **argv)
2242 {
2243         int ret;
2244
2245         if (argc != 0) {
2246                 usage("disablemonitor");
2247         }
2248
2249         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2250                                         ctdb->cmd_pnn, TIMEOUT());
2251         if (ret != 0) {
2252                 return ret;
2253         }
2254
2255         return 0;
2256 }
2257
2258 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2259                                   struct ctdb_context *ctdb,
2260                                   int argc, const char **argv)
2261 {
2262         int ret;
2263
2264         if (argc != 0) {
2265                 usage("enablemonitor");
2266         }
2267
2268         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2269                                        ctdb->cmd_pnn, TIMEOUT());
2270         if (ret != 0) {
2271                 return ret;
2272         }
2273
2274         return 0;
2275 }
2276
2277 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2278                             int argc, const char **argv)
2279 {
2280         enum debug_level log_level;
2281         int ret;
2282         bool found;
2283
2284         if (argc != 1) {
2285                 usage("setdebug");
2286         }
2287
2288         found = debug_level_parse(argv[0], &log_level);
2289         if (! found) {
2290                 fprintf(stderr,
2291                         "Invalid debug level '%s'. Valid levels are:\n",
2292                         argv[0]);
2293                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2294                 return 1;
2295         }
2296
2297         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2298                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2299         if (ret != 0) {
2300                 return ret;
2301         }
2302
2303         return 0;
2304 }
2305
2306 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2307                             int argc, const char **argv)
2308 {
2309         enum debug_level loglevel;
2310         const char *log_str;
2311         int ret;
2312
2313         if (argc != 0) {
2314                 usage("getdebug");
2315         }
2316
2317         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2318                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2319         if (ret != 0) {
2320                 return ret;
2321         }
2322
2323         log_str = debug_level_to_string(loglevel);
2324         printf("%s\n", log_str);
2325
2326         return 0;
2327 }
2328
2329 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2330                           int argc, const char **argv)
2331 {
2332         const char *db_name;
2333         uint8_t db_flags = 0;
2334         int ret;
2335
2336         if (argc < 1 || argc > 2) {
2337                 usage("attach");
2338         }
2339
2340         db_name = argv[0];
2341         if (argc == 2) {
2342                 if (strcmp(argv[1], "persistent") == 0) {
2343                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2344                 } else if (strcmp(argv[1], "readonly") == 0) {
2345                         db_flags = CTDB_DB_FLAGS_READONLY;
2346                 } else if (strcmp(argv[1], "sticky") == 0) {
2347                         db_flags = CTDB_DB_FLAGS_STICKY;
2348                 } else {
2349                         usage("attach");
2350                 }
2351         }
2352
2353         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2354                           db_flags, NULL);
2355         if (ret != 0) {
2356                 return ret;
2357         }
2358
2359         return 0;
2360 }
2361
2362 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2363                           int argc, const char **argv)
2364 {
2365         const char *db_name;
2366         uint32_t db_id;
2367         uint8_t db_flags;
2368         struct ctdb_node_map *nodemap;
2369         int recmode;
2370         int ret, ret2, i;
2371
2372         if (argc < 1) {
2373                 usage("detach");
2374         }
2375
2376         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2377                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2378         if (ret != 0) {
2379                 return ret;
2380         }
2381
2382         if (recmode == CTDB_RECOVERY_ACTIVE) {
2383                 fprintf(stderr, "Database cannot be detached"
2384                                 " when recovery is active\n");
2385                 return 1;
2386         }
2387
2388         nodemap = get_nodemap(ctdb, false);
2389         if (nodemap == NULL) {
2390                 return 1;
2391         }
2392
2393         for (i=0; i<nodemap->num; i++) {
2394                 uint32_t value;
2395
2396                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2397                         continue;
2398                 }
2399                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2400                         continue;
2401                 }
2402                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2403                         fprintf(stderr, "Database cannot be detached on"
2404                                 " inactive (stopped or banned) node %u\n",
2405                                 nodemap->node[i].pnn);
2406                         return 1;
2407                 }
2408
2409                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2410                                             nodemap->node[i].pnn, TIMEOUT(),
2411                                             "AllowClientDBAttach", &value);
2412                 if (ret != 0) {
2413                         fprintf(stderr,
2414                                 "Unable to get tunable AllowClientDBAttach"
2415                                 " from node %u\n", nodemap->node[i].pnn);
2416                         return ret;
2417                 }
2418
2419                 if (value == 1) {
2420                         fprintf(stderr,
2421                                 "Database access is still active on node %u."
2422                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2423                                 nodemap->node[i].pnn);
2424                         return 1;
2425                 }
2426         }
2427
2428         ret2 = 0;
2429         for (i=0; i<argc; i++) {
2430                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2431                                 &db_flags)) {
2432                         continue;
2433                 }
2434
2435                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2436                         fprintf(stderr,
2437                                 "Persistent database %s cannot be detached\n",
2438                                 argv[0]);
2439                         return 1;
2440                 }
2441
2442                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2443                                   TIMEOUT(), db_id);
2444                 if (ret != 0) {
2445                         fprintf(stderr, "Database %s detach failed\n", db_name);
2446                         ret2 = ret;
2447                 }
2448         }
2449
2450         return ret2;
2451 }
2452
2453 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2454                               int argc, const char **argv)
2455 {
2456         const char *mem_str;
2457         ssize_t n;
2458         int ret;
2459
2460         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2461                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2462         if (ret != 0) {
2463                 return ret;
2464         }
2465
2466         n = write(1, mem_str, strlen(mem_str)+1);
2467         if (n < 0 || n != strlen(mem_str)+1) {
2468                 fprintf(stderr, "Failed to write talloc summary\n");
2469                 return 1;
2470         }
2471
2472         return 0;
2473 }
2474
2475 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2476 {
2477         bool *done = (bool *)private_data;
2478         ssize_t n;
2479
2480         n = write(1, data.dptr, data.dsize);
2481         if (n < 0 || n != data.dsize) {
2482                 fprintf(stderr, "Failed to write talloc summary\n");
2483         }
2484
2485         *done = true;
2486 }
2487
2488 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2489                                 int argc, const char **argv)
2490 {
2491         struct ctdb_srvid_message msg = { 0 };
2492         int ret;
2493         bool done = false;
2494
2495         msg.pnn = ctdb->pnn;
2496         msg.srvid = next_srvid(ctdb);
2497
2498         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2499                                               msg.srvid, dump_memory, &done);
2500         if (ret != 0) {
2501                 return ret;
2502         }
2503
2504         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2505                                     ctdb->cmd_pnn, &msg);
2506         if (ret != 0) {
2507                 return ret;
2508         }
2509
2510         ctdb_client_wait(ctdb->ev, &done);
2511         return 0;
2512 }
2513
2514 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2515                           int argc, const char **argv)
2516 {
2517         pid_t pid;
2518         int ret;
2519
2520         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2521                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2522         if (ret != 0) {
2523                 return ret;
2524         }
2525
2526         printf("%u\n", pid);
2527         return 0;
2528 }
2529
2530 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2531                        const char *desc, uint32_t flag, bool set_flag)
2532 {
2533         struct ctdb_node_map *nodemap;
2534         bool flag_is_set;
2535
2536         nodemap = get_nodemap(ctdb, false);
2537         if (nodemap == NULL) {
2538                 return 1;
2539         }
2540
2541         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2542         if (set_flag == flag_is_set) {
2543                 if (set_flag) {
2544                         fprintf(stderr, "Node %u is already %s\n",
2545                                 ctdb->cmd_pnn, desc);
2546                 } else {
2547                         fprintf(stderr, "Node %u is not %s\n",
2548                                 ctdb->cmd_pnn, desc);
2549                 }
2550                 return 0;
2551         }
2552
2553         return 1;
2554 }
2555
2556 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2557                            uint32_t flag, bool set_flag)
2558 {
2559         struct ctdb_node_map *nodemap;
2560         bool flag_is_set;
2561
2562         while (1) {
2563                 nodemap = get_nodemap(ctdb, true);
2564                 if (nodemap == NULL) {
2565                         fprintf(stderr,
2566                                 "Failed to get nodemap, trying again\n");
2567                         sleep(1);
2568                         continue;
2569                 }
2570
2571                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2572                 if (flag_is_set == set_flag) {
2573                         break;
2574                 }
2575
2576                 sleep(1);
2577         }
2578 }
2579
2580 struct ipreallocate_state {
2581         int status;
2582         bool done;
2583 };
2584
2585 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2586                                  void *private_data)
2587 {
2588         struct ipreallocate_state *state =
2589                 (struct ipreallocate_state *)private_data;
2590
2591         if (data.dsize != sizeof(int)) {
2592                 /* Ignore packet */
2593                 return;
2594         }
2595
2596         state->status = *(int *)data.dptr;
2597         state->done = true;
2598 }
2599
2600 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2601 {
2602         struct ctdb_srvid_message msg = { 0 };
2603         struct ipreallocate_state state;
2604         int ret;
2605
2606         msg.pnn = ctdb->pnn;
2607         msg.srvid = next_srvid(ctdb);
2608
2609         state.done = false;
2610         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2611                                               msg.srvid,
2612                                               ipreallocate_handler, &state);
2613         if (ret != 0) {
2614                 return ret;
2615         }
2616
2617         while (true) {
2618                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2619                                                 ctdb->client,
2620                                                 CTDB_BROADCAST_CONNECTED,
2621                                                 &msg);
2622                 if (ret != 0) {
2623                         goto fail;
2624                 }
2625
2626                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2627                                                TIMEOUT());
2628                 if (ret != 0) {
2629                         continue;
2630                 }
2631
2632                 if (state.status >= 0) {
2633                         ret = 0;
2634                 } else {
2635                         ret = state.status;
2636                 }
2637                 break;
2638         }
2639
2640 fail:
2641         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2642                                            msg.srvid, &state);
2643         return ret;
2644 }
2645
2646 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2647                            int argc, const char **argv)
2648 {
2649         int ret;
2650
2651         if (argc != 0) {
2652                 usage("disable");
2653         }
2654
2655         ret = check_flags(mem_ctx, ctdb, "disabled",
2656                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2657         if (ret == 0) {
2658                 return 0;
2659         }
2660
2661         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2662                                  ctdb->cmd_pnn, TIMEOUT(),
2663                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2664         if (ret != 0) {
2665                 fprintf(stderr,
2666                         "Failed to set DISABLED flag on node %u\n",
2667                         ctdb->cmd_pnn);
2668                 return ret;
2669         }
2670
2671         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2672         return ipreallocate(mem_ctx, ctdb);
2673 }
2674
2675 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2676                           int argc, const char **argv)
2677 {
2678         int ret;
2679
2680         if (argc != 0) {
2681                 usage("enable");
2682         }
2683
2684         ret = check_flags(mem_ctx, ctdb, "disabled",
2685                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2686         if (ret == 0) {
2687                 return 0;
2688         }
2689
2690         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2691                                  ctdb->cmd_pnn, TIMEOUT(),
2692                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2693         if (ret != 0) {
2694                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2695                         ctdb->cmd_pnn);
2696                 return ret;
2697         }
2698
2699         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2700         return ipreallocate(mem_ctx, ctdb);
2701 }
2702
2703 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2704                         int argc, const char **argv)
2705 {
2706         int ret;
2707
2708         if (argc != 0) {
2709                 usage("stop");
2710         }
2711
2712         ret = check_flags(mem_ctx, ctdb, "stopped",
2713                           NODE_FLAGS_STOPPED, true);
2714         if (ret == 0) {
2715                 return 0;
2716         }
2717
2718         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2719                                   ctdb->cmd_pnn, TIMEOUT());
2720         if (ret != 0) {
2721                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2722                 return ret;
2723         }
2724
2725         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2726         return ipreallocate(mem_ctx, ctdb);
2727 }
2728
2729 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2730                             int argc, const char **argv)
2731 {
2732         int ret;
2733
2734         if (argc != 0) {
2735                 usage("continue");
2736         }
2737
2738         ret = check_flags(mem_ctx, ctdb, "stopped",
2739                           NODE_FLAGS_STOPPED, false);
2740         if (ret == 0) {
2741                 return 0;
2742         }
2743
2744         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2745                                       ctdb->cmd_pnn, TIMEOUT());
2746         if (ret != 0) {
2747                 fprintf(stderr, "Failed to continue stopped node %u\n",
2748                         ctdb->cmd_pnn);
2749                 return ret;
2750         }
2751
2752         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2753         return ipreallocate(mem_ctx, ctdb);
2754 }
2755
2756 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2757                        int argc, const char **argv)
2758 {
2759         struct ctdb_ban_state ban_state;
2760         int ret;
2761
2762         if (argc != 1) {
2763                 usage("ban");
2764         }
2765
2766         ret = check_flags(mem_ctx, ctdb, "banned",
2767                           NODE_FLAGS_BANNED, true);
2768         if (ret == 0) {
2769                 return 0;
2770         }
2771
2772         ban_state.pnn = ctdb->cmd_pnn;
2773         ban_state.time = strtoul(argv[0], NULL, 0);
2774
2775         if (ban_state.time == 0) {
2776                 fprintf(stderr, "Ban time cannot be zero\n");
2777                 return EINVAL;
2778         }
2779
2780         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2781                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2782         if (ret != 0) {
2783                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2784                 return ret;
2785         }
2786
2787         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2788         return ipreallocate(mem_ctx, ctdb);
2789
2790 }
2791
2792 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2793                          int argc, const char **argv)
2794 {
2795         struct ctdb_ban_state ban_state;
2796         int ret;
2797
2798         if (argc != 0) {
2799                 usage("unban");
2800         }
2801
2802         ret = check_flags(mem_ctx, ctdb, "banned",
2803                           NODE_FLAGS_BANNED, false);
2804         if (ret == 0) {
2805                 return 0;
2806         }
2807
2808         ban_state.pnn = ctdb->cmd_pnn;
2809         ban_state.time = 0;
2810
2811         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2812                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2813         if (ret != 0) {
2814                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2815                 return ret;
2816         }
2817
2818         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2819         return ipreallocate(mem_ctx, ctdb);
2820
2821 }
2822
2823 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2824                             int argc, const char **argv)
2825 {
2826         int ret;
2827
2828         if (argc != 0) {
2829                 usage("shutdown");
2830         }
2831
2832         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2833                                  ctdb->cmd_pnn, TIMEOUT());
2834         if (ret != 0) {
2835                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2836                 return ret;
2837         }
2838
2839         return 0;
2840 }
2841
2842 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2843                           uint32_t *generation)
2844 {
2845         uint32_t recmaster;
2846         int recmode;
2847         struct ctdb_vnn_map *vnnmap;
2848         int ret;
2849
2850 again:
2851         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2852                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2853         if (ret != 0) {
2854                 fprintf(stderr, "Failed to find recovery master\n");
2855                 return ret;
2856         }
2857
2858         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2859                                     recmaster, TIMEOUT(), &recmode);
2860         if (ret != 0) {
2861                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2862                         recmaster);
2863                 return ret;
2864         }
2865
2866         if (recmode == CTDB_RECOVERY_ACTIVE) {
2867                 sleep(1);
2868                 goto again;
2869         }
2870
2871         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2872                                   recmaster, TIMEOUT(), &vnnmap);
2873         if (ret != 0) {
2874                 fprintf(stderr, "Failed to get generation from node %u\n",
2875                         recmaster);
2876                 return ret;
2877         }
2878
2879         if (vnnmap->generation == 1) {
2880                 talloc_free(vnnmap);
2881                 sleep(1);
2882                 goto again;
2883         }
2884
2885         *generation = vnnmap->generation;
2886         talloc_free(vnnmap);
2887         return 0;
2888 }
2889
2890
2891 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2892                            int argc, const char **argv)
2893 {
2894         uint32_t generation, next_generation;
2895         int ret;
2896
2897         if (argc != 0) {
2898                 usage("recover");
2899         }
2900
2901         ret = get_generation(mem_ctx, ctdb, &generation);
2902         if (ret != 0) {
2903                 return ret;
2904         }
2905
2906         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2907                                     ctdb->cmd_pnn, TIMEOUT(),
2908                                     CTDB_RECOVERY_ACTIVE);
2909         if (ret != 0) {
2910                 fprintf(stderr, "Failed to set recovery mode active\n");
2911                 return ret;
2912         }
2913
2914         while (1) {
2915                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2916                 if (ret != 0) {
2917                         fprintf(stderr,
2918                                 "Failed to confirm end of recovery\n");
2919                         return ret;
2920                 }
2921
2922                 if (next_generation != generation) {
2923                         break;
2924                 }
2925
2926                 sleep (1);
2927         }
2928
2929         return 0;
2930 }
2931
2932 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2933                                 int argc, const char **argv)
2934 {
2935         if (argc != 0) {
2936                 usage("ipreallocate");
2937         }
2938
2939         return ipreallocate(mem_ctx, ctdb);
2940 }
2941
2942 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2943                                   struct ctdb_context *ctdb,
2944                                   int argc, const char **argv)
2945 {
2946         uint32_t recmaster;
2947         int ret;
2948
2949         if (argc != 0) {
2950                 usage("isnotrecmaster");
2951         }
2952
2953         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2954                                       ctdb->pnn, TIMEOUT(), &recmaster);
2955         if (ret != 0) {
2956                 fprintf(stderr, "Failed to get recmaster\n");
2957                 return ret;
2958         }
2959
2960         if (recmaster != ctdb->pnn) {
2961                 printf("this node is not the recmaster\n");
2962                 return 1;
2963         }
2964
2965         printf("this node is the recmaster\n");
2966         return 0;
2967 }
2968
2969 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2970                            int argc, const char **argv)
2971 {
2972         struct ctdb_addr_info addr_info;
2973         int ret;
2974
2975         if (argc != 2) {
2976                 usage("gratarp");
2977         }
2978
2979         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2980                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2981                 return 1;
2982         }
2983         addr_info.iface = argv[1];
2984
2985         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2986                                             ctdb->cmd_pnn, TIMEOUT(),
2987                                             &addr_info);
2988         if (ret != 0) {
2989                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2990                         ctdb->cmd_pnn);
2991                 return ret;
2992         }
2993
2994         return 0;
2995 }
2996
2997 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2998                            int argc, const char **argv)
2999 {
3000         ctdb_sock_addr src, dst;
3001         int ret;
3002
3003         if (argc != 0 && argc != 2) {
3004                 usage("tickle");
3005         }
3006
3007         if (argc == 0) {
3008                 struct ctdb_connection *clist;
3009                 int count;
3010                 int i, num_failed;
3011
3012                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3013                 if (ret != 0) {
3014                         return ret;
3015                 }
3016
3017                 num_failed = 0;
3018                 for (i=0; i<count; i++) {
3019                         ret = ctdb_sys_send_tcp(&clist[i].src,
3020                                                 &clist[i].dst,
3021                                                 0, 0, 0);
3022                         if (ret != 0) {
3023                                 num_failed += 1;
3024                         }
3025                 }
3026
3027                 TALLOC_FREE(clist);
3028
3029                 if (num_failed > 0) {
3030                         fprintf(stderr, "Failed to send %d tickles\n",
3031                                 num_failed);
3032                         return 1;
3033                 }
3034
3035                 return 0;
3036         }
3037
3038
3039         if (! parse_ip_port(argv[0], &src)) {
3040                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3041                 return 1;
3042         }
3043
3044         if (! parse_ip_port(argv[1], &dst)) {
3045                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3046                 return 1;
3047         }
3048
3049         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3050         if (ret != 0) {
3051                 fprintf(stderr, "Failed to send tickle ack\n");
3052                 return ret;
3053         }
3054
3055         return 0;
3056 }
3057
3058 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3059                               int argc, const char **argv)
3060 {
3061         ctdb_sock_addr addr;
3062         struct ctdb_tickle_list *tickles;
3063         unsigned port = 0;
3064         int ret, i;
3065
3066         if (argc < 1 || argc > 2) {
3067                 usage("gettickles");
3068         }
3069
3070         if (argc == 2) {
3071                 port = strtoul(argv[1], NULL, 10);
3072         }
3073
3074         if (! parse_ip(argv[0], NULL, port, &addr)) {
3075                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3076                 return 1;
3077         }
3078
3079         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3080                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3081                                             &tickles);
3082         if (ret != 0) {
3083                 fprintf(stderr, "Failed to get list of connections\n");
3084                 return ret;
3085         }
3086
3087         if (options.machinereadable) {
3088                 printf("%s%s%s%s%s%s%s%s%s\n",
3089                        options.sep,
3090                        "Source IP", options.sep,
3091                        "Port", options.sep,
3092                        "Destiation IP", options.sep,
3093                        "Port", options.sep);
3094                 for (i=0; i<tickles->num; i++) {
3095                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3096                                ctdb_sock_addr_to_string(
3097                                        mem_ctx, &tickles->conn[i].src),
3098                                options.sep,
3099                                ntohs(tickles->conn[i].src.ip.sin_port),
3100                                options.sep,
3101                                ctdb_sock_addr_to_string(
3102                                        mem_ctx, &tickles->conn[i].dst),
3103                                options.sep,
3104                                ntohs(tickles->conn[i].dst.ip.sin_port),
3105                                options.sep);
3106                 }
3107         } else {
3108                 printf("Connections for IP: %s\n",
3109                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3110                 printf("Num connections: %u\n", tickles->num);
3111                 for (i=0; i<tickles->num; i++) {
3112                         printf("SRC: %s:%u   DST: %s:%u\n",
3113                                ctdb_sock_addr_to_string(
3114                                        mem_ctx, &tickles->conn[i].src),
3115                                ntohs(tickles->conn[i].src.ip.sin_port),
3116                                ctdb_sock_addr_to_string(
3117                                        mem_ctx, &tickles->conn[i].dst),
3118                                ntohs(tickles->conn[i].dst.ip.sin_port));
3119                 }
3120         }
3121
3122         return 0;
3123 }
3124
3125 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3126                                    struct ctdb_connection *conn);
3127
3128 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3129
3130 struct process_clist_state {
3131         struct ctdb_connection *clist;
3132         int count;
3133         int num_failed, num_total;
3134         clist_reply_func reply_func;
3135 };
3136
3137 static void process_clist_done(struct tevent_req *subreq);
3138
3139 static struct tevent_req *process_clist_send(
3140                                         TALLOC_CTX *mem_ctx,
3141                                         struct ctdb_context *ctdb,
3142                                         struct ctdb_connection *clist,
3143                                         int count,
3144                                         clist_request_func request_func,
3145                                         clist_reply_func reply_func)
3146 {
3147         struct tevent_req *req, *subreq;
3148         struct process_clist_state *state;
3149         struct ctdb_req_control request;
3150         int i;
3151
3152         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3153         if (req == NULL) {
3154                 return NULL;
3155         }
3156
3157         state->clist = clist;
3158         state->count = count;
3159         state->reply_func = reply_func;
3160
3161         for (i=0; i<count; i++) {
3162                 request_func(&request, &clist[i]);
3163                 subreq = ctdb_client_control_send(state, ctdb->ev,
3164                                                   ctdb->client, ctdb->cmd_pnn,
3165                                                   TIMEOUT(), &request);
3166                 if (tevent_req_nomem(subreq, req)) {
3167                         return tevent_req_post(req, ctdb->ev);
3168                 }
3169                 tevent_req_set_callback(subreq, process_clist_done, req);
3170         }
3171
3172         return req;
3173 }
3174
3175 static void process_clist_done(struct tevent_req *subreq)
3176 {
3177         struct tevent_req *req = tevent_req_callback_data(
3178                 subreq, struct tevent_req);
3179         struct process_clist_state *state = tevent_req_data(
3180                 req, struct process_clist_state);
3181         struct ctdb_reply_control *reply;
3182         int ret;
3183         bool status;
3184
3185         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3186         TALLOC_FREE(subreq);
3187         if (! status) {
3188                 state->num_failed += 1;
3189                 goto done;
3190         }
3191
3192         ret = state->reply_func(reply);
3193         if (ret != 0) {
3194                 state->num_failed += 1;
3195                 goto done;
3196         }
3197
3198 done:
3199         state->num_total += 1;
3200         if (state->num_total == state->count) {
3201                 tevent_req_done(req);
3202         }
3203 }
3204
3205 static int process_clist_recv(struct tevent_req *req)
3206 {
3207         struct process_clist_state *state = tevent_req_data(
3208                 req, struct process_clist_state);
3209
3210         return state->num_failed;
3211 }
3212
3213 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3214                              int argc, const char **argv)
3215 {
3216         struct ctdb_connection conn;
3217         int ret;
3218
3219         if (argc != 0 && argc != 2) {
3220                 usage("addtickle");
3221         }
3222
3223         if (argc == 0) {
3224                 struct ctdb_connection *clist;
3225                 struct tevent_req *req;
3226                 int count;
3227
3228                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3229                 if (ret != 0) {
3230                         return ret;
3231                 }
3232
3233                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3234                                  ctdb_req_control_tcp_add_delayed_update,
3235                                  ctdb_reply_control_tcp_add_delayed_update);
3236                 if (req == NULL) {
3237                         return ENOMEM;
3238                 }
3239
3240                 tevent_req_poll(req, ctdb->ev);
3241
3242                 ret = process_clist_recv(req);
3243                 if (ret != 0) {
3244                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3245                         return 1;
3246                 }
3247
3248                 return 0;
3249         }
3250
3251         if (! parse_ip_port(argv[0], &conn.src)) {
3252                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3253                 return 1;
3254         }
3255         if (! parse_ip_port(argv[1], &conn.dst)) {
3256                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3257                 return 1;
3258         }
3259
3260         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3261                                                ctdb->client, ctdb->cmd_pnn,
3262                                                TIMEOUT(), &conn);
3263         if (ret != 0) {
3264                 fprintf(stderr, "Failed to register connection\n");
3265                 return ret;
3266         }
3267
3268         return 0;
3269 }
3270
3271 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3272                              int argc, const char **argv)
3273 {
3274         struct ctdb_connection conn;
3275         int ret;
3276
3277         if (argc != 0 && argc != 2) {
3278                 usage("deltickle");
3279         }
3280
3281         if (argc == 0) {
3282                 struct ctdb_connection *clist;
3283                 struct tevent_req *req;
3284                 int count;
3285
3286                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3287                 if (ret != 0) {
3288                         return ret;
3289                 }
3290
3291                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3292                                          ctdb_req_control_tcp_remove,
3293                                          ctdb_reply_control_tcp_remove);
3294                 if (req == NULL) {
3295                         return ENOMEM;
3296                 }
3297
3298                 tevent_req_poll(req, ctdb->ev);
3299
3300                 ret = process_clist_recv(req);
3301                 if (ret != 0) {
3302                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3303                         return 1;
3304                 }
3305
3306                 return 0;
3307         }
3308
3309         if (! parse_ip_port(argv[0], &conn.src)) {
3310                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3311                 return 1;
3312         }
3313         if (! parse_ip_port(argv[1], &conn.dst)) {
3314                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3315                 return 1;
3316         }
3317
3318         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3319                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3320         if (ret != 0) {
3321                 fprintf(stderr, "Failed to unregister connection\n");
3322                 return ret;
3323         }
3324
3325         return 0;
3326 }
3327
3328 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3329                                 int argc, const char **argv)
3330 {
3331         uint64_t *srvid;
3332         uint8_t *result;
3333         int ret, i;
3334
3335         if (argc == 0) {
3336                 usage("check_srvids");
3337         }
3338
3339         srvid = talloc_array(mem_ctx, uint64_t, argc);
3340         if (srvid == NULL) {
3341                 fprintf(stderr, "Memory allocation error\n");
3342                 return 1;
3343         }
3344
3345         for (i=0; i<argc; i++) {
3346                 srvid[i] = strtoull(argv[i], NULL, 0);
3347         }
3348
3349         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3350                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3351                                      &result);
3352         if (ret != 0) {
3353                 fprintf(stderr, "Failed to check srvids on node %u\n",
3354                         ctdb->cmd_pnn);
3355                 return ret;
3356         }
3357
3358         for (i=0; i<argc; i++) {
3359                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3360                        (result[i] ? "exists" : "does not exist"));
3361         }
3362
3363         return 0;
3364 }
3365
3366 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3367                              int argc, const char **argv)
3368 {
3369         struct ctdb_node_map *nodemap;
3370         int i;
3371
3372         if (argc != 0) {
3373                 usage("listnodes");
3374         }
3375
3376         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3377         if (nodemap == NULL) {
3378                 return 1;
3379         }
3380
3381         for (i=0; i<nodemap->num; i++) {
3382                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3383                         continue;
3384                 }
3385
3386                 if (options.machinereadable) {
3387                         printf("%s%u%s%s%s\n", options.sep,
3388                                nodemap->node[i].pnn, options.sep,
3389                                ctdb_sock_addr_to_string(
3390                                         mem_ctx, &nodemap->node[i].addr),
3391                                options.sep);
3392                 } else {
3393                         printf("%s\n",
3394                                ctdb_sock_addr_to_string(
3395                                         mem_ctx, &nodemap->node[i].addr));
3396                 }
3397         }
3398
3399         return 0;
3400 }
3401
3402 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3403                               struct ctdb_node_map *nodemap2)
3404 {
3405         int i;
3406
3407         if (nodemap1->num != nodemap2->num) {
3408                 return false;
3409         }
3410
3411         for (i=0; i<nodemap1->num; i++) {
3412                 struct ctdb_node_and_flags *n1, *n2;
3413
3414                 n1 = &nodemap1->node[i];
3415                 n2 = &nodemap2->node[i];
3416
3417                 if ((n1->pnn != n2->pnn) ||
3418                     (n1->flags != n2->flags) ||
3419                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3420                         return false;
3421                 }
3422         }
3423
3424         return true;
3425 }
3426
3427 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3428                                    struct ctdb_node_map *nm,
3429                                    struct ctdb_node_map *fnm,
3430                                    bool *reload)
3431 {
3432         int i;
3433         bool check_failed = false;
3434
3435         *reload = false;
3436
3437         for (i=0; i<nm->num; i++) {
3438                 if (i >= fnm->num) {
3439                         fprintf(stderr,
3440                                 "Node %u (%s) missing from nodes file\n",
3441                                 nm->node[i].pnn,
3442                                 ctdb_sock_addr_to_string(
3443                                         mem_ctx, &nm->node[i].addr));
3444                         check_failed = true;
3445                         continue;
3446                 }
3447                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3448                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3449                         /* Node remains deleted */
3450                         continue;
3451                 }
3452
3453                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3454                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3455                         /* Node not newly nor previously deleted */
3456                         if (! ctdb_same_ip(&nm->node[i].addr,
3457                                            &fnm->node[i].addr)) {
3458                                 fprintf(stderr,
3459                                         "Node %u has changed IP address"
3460                                         " (was %s, now %s)\n",
3461                                         nm->node[i].pnn,
3462                                         ctdb_sock_addr_to_string(
3463                                                 mem_ctx, &nm->node[i].addr),
3464                                         ctdb_sock_addr_to_string(
3465                                                 mem_ctx, &fnm->node[i].addr));
3466                                 check_failed = true;
3467                         } else {
3468                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3469                                         fprintf(stderr,
3470                                                 "WARNING: Node %u is disconnected."
3471                                                 " You MUST fix this node manually!\n",
3472                                                 nm->node[i].pnn);
3473                                 }
3474                         }
3475                         continue;
3476                 }
3477
3478                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3479                         /* Node is being deleted */
3480                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3481                         *reload = true;
3482                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3483                                 fprintf(stderr,
3484                                         "ERROR: Node %u is still connected\n",
3485                                         nm->node[i].pnn);
3486                                 check_failed = true;
3487                         }
3488                         continue;
3489                 }
3490
3491                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3492                         /* Node was previously deleted */
3493                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3494                         *reload = true;
3495                 }
3496         }
3497
3498         if (check_failed) {
3499                 fprintf(stderr,
3500                         "ERROR: Nodes will not be reloaded due to previous error\n");
3501                 return 1;
3502         }
3503
3504         /* Leftover nodes in file are NEW */
3505         for (; i < fnm->num; i++) {
3506                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3507                 *reload = true;
3508         }
3509
3510         return 0;
3511 }
3512
3513 struct disable_recoveries_state {
3514         uint32_t *pnn_list;
3515         int node_count;
3516         bool *reply;
3517         int status;
3518         bool done;
3519 };
3520
3521 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3522                                        void *private_data)
3523 {
3524         struct disable_recoveries_state *state =
3525                 (struct disable_recoveries_state *)private_data;
3526         int ret, i;
3527
3528         if (data.dsize != sizeof(int)) {
3529                 /* Ignore packet */
3530                 return;
3531         }
3532
3533         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3534         ret = *(int *)data.dptr;
3535         if (ret < 0) {
3536                 state->status = ret;
3537                 state->done = true;
3538                 return;
3539         }
3540         for (i=0; i<state->node_count; i++) {
3541                 if (state->pnn_list[i] == ret) {
3542                         state->reply[i] = true;
3543                         break;
3544                 }
3545         }
3546
3547         state->done = true;
3548         for (i=0; i<state->node_count; i++) {
3549                 if (! state->reply[i]) {
3550                         state->done = false;
3551                         break;
3552                 }
3553         }
3554 }
3555
3556 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3557                               uint32_t timeout, uint32_t *pnn_list, int count)
3558 {
3559         struct ctdb_disable_message disable = { 0 };
3560         struct disable_recoveries_state state;
3561         int ret, i;
3562
3563         disable.pnn = ctdb->pnn;
3564         disable.srvid = next_srvid(ctdb);
3565         disable.timeout = timeout;
3566
3567         state.pnn_list = pnn_list;
3568         state.node_count = count;
3569         state.done = false;
3570         state.status = 0;
3571         state.reply = talloc_zero_array(mem_ctx, bool, count);
3572         if (state.reply == NULL) {
3573                 return ENOMEM;
3574         }
3575
3576         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3577                                               disable.srvid,
3578                                               disable_recoveries_handler,
3579                                               &state);
3580         if (ret != 0) {
3581                 return ret;
3582         }
3583
3584         for (i=0; i<count; i++) {
3585                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3586                                                       ctdb->client,
3587                                                       pnn_list[i],
3588                                                       &disable);
3589                 if (ret != 0) {
3590                         goto fail;
3591                 }
3592         }
3593
3594         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3595         if (ret == ETIME) {
3596                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3597         } else {
3598                 ret = (state.status >= 0 ? 0 : 1);
3599         }
3600
3601 fail:
3602         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3603                                            disable.srvid, &state);
3604         return ret;
3605 }
3606
3607 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3608                                int argc, const char **argv)
3609 {
3610         struct ctdb_node_map *nodemap = NULL;
3611         struct ctdb_node_map *file_nodemap;
3612         struct ctdb_node_map *remote_nodemap;
3613         struct ctdb_req_control request;
3614         struct ctdb_reply_control **reply;
3615         bool reload;
3616         int ret, i;
3617         uint32_t *pnn_list;
3618         int count;
3619
3620         nodemap = get_nodemap(ctdb, false);
3621         if (nodemap == NULL) {
3622                 return 1;
3623         }
3624
3625         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3626         if (file_nodemap == NULL) {
3627                 return 1;
3628         }
3629
3630         for (i=0; i<nodemap->num; i++) {
3631                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3632                         continue;
3633                 }
3634
3635                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3636                                                nodemap->node[i].pnn, TIMEOUT(),
3637                                                &remote_nodemap);
3638                 if (ret != 0) {
3639                         fprintf(stderr,
3640                                 "ERROR: Failed to get nodes file from node %u\n",
3641                                 nodemap->node[i].pnn);
3642                         return ret;
3643                 }
3644
3645                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3646                         fprintf(stderr,
3647                                 "ERROR: Nodes file on node %u differs"
3648                                  " from current node (%u)\n",
3649                                  nodemap->node[i].pnn, ctdb->pnn);
3650                         return 1;
3651                 }
3652         }
3653
3654         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3655         if (ret != 0) {
3656                 return ret;
3657         }
3658
3659         if (! reload) {
3660                 fprintf(stderr, "No change in nodes file,"
3661                                 " skipping unnecessary reload\n");
3662                 return 0;
3663         }
3664
3665         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3666                                         mem_ctx, &pnn_list);
3667         if (count <= 0) {
3668                 fprintf(stderr, "Memory allocation error\n");
3669                 return 1;
3670         }
3671
3672         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3673                                  pnn_list, count);
3674         if (ret != 0) {
3675                 fprintf(stderr, "Failed to disable recoveries\n");
3676                 return ret;
3677         }
3678
3679         ctdb_req_control_reload_nodes_file(&request);
3680         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3681                                         pnn_list, count, TIMEOUT(),
3682                                         &request, NULL, &reply);
3683         if (ret != 0) {
3684                 bool failed = false;
3685
3686                 for (i=0; i<count; i++) {
3687                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3688                         if (ret != 0) {
3689                                 fprintf(stderr,
3690                                         "Node %u failed to reload nodes\n",
3691                                         pnn_list[i]);
3692                                 failed = true;
3693                         }
3694                 }
3695                 if (failed) {
3696                         fprintf(stderr,
3697                                 "You MUST fix failed nodes manually!\n");
3698                 }
3699         }
3700
3701         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3702         if (ret != 0) {
3703                 fprintf(stderr, "Failed to enable recoveries\n");
3704                 return ret;
3705         }
3706
3707         return 0;
3708 }
3709
3710 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3711                   ctdb_sock_addr *addr, uint32_t pnn)
3712 {
3713         struct ctdb_public_ip_list *pubip_list;
3714         struct ctdb_public_ip pubip;
3715         struct ctdb_node_map *nodemap;
3716         struct ctdb_req_control request;
3717         uint32_t *pnn_list;
3718         int ret, i, count;
3719
3720         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3721                                             CTDB_BROADCAST_CONNECTED,
3722                                             2*options.timelimit);
3723         if (ret != 0) {
3724                 fprintf(stderr, "Failed to disable IP check\n");
3725                 return ret;
3726         }
3727
3728         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3729                                        pnn, TIMEOUT(), &pubip_list);
3730         if (ret != 0) {
3731                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3732                         pnn);
3733                 return ret;
3734         }
3735
3736         for (i=0; i<pubip_list->num; i++) {
3737                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3738                         break;
3739                 }
3740         }
3741
3742         if (i == pubip_list->num) {
3743                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3744                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3745                 return 1;
3746         }
3747
3748         nodemap = get_nodemap(ctdb, false);
3749         if (nodemap == NULL) {
3750                 return 1;
3751         }
3752
3753         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3754         if (count <= 0) {
3755                 fprintf(stderr, "Memory allocation error\n");
3756                 return 1;
3757         }
3758
3759         pubip.pnn = pnn;
3760         pubip.addr = *addr;
3761         ctdb_req_control_release_ip(&request, &pubip);
3762
3763         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3764                                         pnn_list, count, TIMEOUT(),
3765                                         &request, NULL, NULL);
3766         if (ret != 0) {
3767                 fprintf(stderr, "Failed to release IP on nodes\n");
3768                 return ret;
3769         }
3770
3771         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3772                                     pnn, TIMEOUT(), &pubip);
3773         if (ret != 0) {
3774                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3775                 return ret;
3776         }
3777
3778         return 0;
3779 }
3780
3781 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3782                           int argc, const char **argv)
3783 {
3784         ctdb_sock_addr addr;
3785         uint32_t pnn;
3786         int ret, retries = 0;
3787
3788         if (argc != 2) {
3789                 usage("moveip");
3790         }
3791
3792         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3793                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3794                 return 1;
3795         }
3796
3797         pnn = strtoul(argv[1], NULL, 10);
3798         if (pnn == CTDB_UNKNOWN_PNN) {
3799                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3800                 return 1;
3801         }
3802
3803         while (retries < 5) {
3804                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3805                 if (ret == 0) {
3806                         break;
3807                 }
3808
3809                 sleep(3);
3810                 retries++;
3811         }
3812
3813         if (ret != 0) {
3814                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3815                         argv[0], pnn);
3816                 return ret;
3817         }
3818
3819         return 0;
3820 }
3821
3822 static int control_rebalanceip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3823                                int argc, const char **argv)
3824 {
3825         ctdb_sock_addr addr;
3826         struct ctdb_node_map *nodemap;
3827         struct ctdb_public_ip pubip;
3828         struct ctdb_req_control request;
3829         uint32_t *pnn_list;
3830         int ret, count;
3831
3832         if (argc != 1) {
3833                 usage("rebalanceip");
3834         }
3835
3836         if (parse_ip(argv[0], NULL, 0, &addr)) {
3837                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3838                 return 1;
3839         }
3840
3841         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3842                                             CTDB_BROADCAST_CONNECTED,
3843                                             2*options.timelimit);
3844         if (ret != 0) {
3845                 fprintf(stderr, "Failed to disable IP check\n");
3846                 return 1;
3847         }
3848
3849         nodemap = get_nodemap(ctdb, false);
3850         if (nodemap == NULL) {
3851                 return 1;
3852         }
3853
3854         count = list_of_active_nodes(nodemap, -1, mem_ctx, &pnn_list);
3855         if (count <= 0) {
3856                 fprintf(stderr, "Memory allocation error\n");
3857                 return 1;
3858         }
3859
3860         pubip.pnn = CTDB_UNKNOWN_PNN;
3861         pubip.addr = addr;
3862
3863         ctdb_req_control_release_ip(&request, &pubip);
3864         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3865                                         pnn_list, count, TIMEOUT(),
3866                                         &request, NULL, NULL);
3867         if (ret != 0) {
3868                 fprintf(stderr, "Failed to release IP from nodes\n");
3869                 return 1;
3870         }
3871
3872         return ipreallocate(mem_ctx, ctdb);
3873 }
3874
3875 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3876                          uint32_t pnn)
3877 {
3878         int ret;
3879
3880         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3881                                           CTDB_BROADCAST_CONNECTED, pnn);
3882         if (ret != 0) {
3883                 fprintf(stderr,
3884                         "Failed to ask recovery master to distribute IPs\n");
3885                 return ret;
3886         }
3887
3888         return 0;
3889 }
3890
3891 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3892                          int argc, const char **argv)
3893 {
3894         ctdb_sock_addr addr;
3895         struct ctdb_public_ip_list *pubip_list;
3896         struct ctdb_addr_info addr_info;
3897         unsigned int mask;
3898         int ret, i, retries = 0;
3899
3900         if (argc != 2) {
3901                 usage("addip");
3902         }
3903
3904         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3905                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3906                 return 1;
3907         }
3908
3909         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3910                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3911         if (ret != 0) {
3912                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3913                         ctdb->cmd_pnn);
3914                 return 1;
3915         }
3916
3917         for (i=0; i<pubip_list->num; i++) {
3918                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3919                         fprintf(stderr, "Node already knows about IP %s\n",
3920                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3921                         return 0;
3922                 }
3923         }
3924
3925         addr_info.addr = addr;
3926         addr_info.mask = mask;
3927         addr_info.iface = argv[1];
3928
3929         while (retries < 5) {
3930                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3931                                               ctdb->cmd_pnn, TIMEOUT(),
3932                                               &addr_info);
3933                 if (ret == 0) {
3934                         break;
3935                 }
3936
3937                 sleep(3);
3938                 retries++;
3939         }
3940
3941         if (ret != 0) {
3942                 fprintf(stderr, "Failed to add public IP to node %u."
3943                                 " Giving up\n", ctdb->cmd_pnn);
3944                 return ret;
3945         }
3946
3947         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3948         if (ret != 0) {
3949                 return ret;
3950         }
3951
3952         return ipreallocate(mem_ctx, ctdb);
3953 }
3954
3955 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3956                          int argc, const char **argv)
3957 {
3958         ctdb_sock_addr addr;
3959         struct ctdb_public_ip_list *pubip_list;
3960         struct ctdb_addr_info addr_info;
3961         int ret, i;
3962
3963         if (argc != 1) {
3964                 usage("delip");
3965         }
3966
3967         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3968                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3969                 return 1;
3970         }
3971
3972         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3973                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3974         if (ret != 0) {
3975                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3976                         ctdb->cmd_pnn);
3977                 return 1;
3978         }
3979
3980         for (i=0; i<pubip_list->num; i++) {
3981                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3982                         break;
3983                 }
3984         }
3985
3986         if (i == pubip_list->num) {
3987                 fprintf(stderr, "Node does not know about IP address %s\n",
3988                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3989                 return 0;
3990         }
3991
3992         addr_info.addr = addr;
3993         addr_info.mask = 0;
3994         addr_info.iface = NULL;
3995
3996         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3997                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3998         if (ret != 0) {
3999                 fprintf(stderr, "Failed to delete public IP from node %u\n",
4000                         ctdb->cmd_pnn);
4001                 return ret;
4002         }
4003
4004         return 0;
4005 }
4006
4007 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4008                                int argc, const char **argv)
4009 {
4010         int ret;
4011
4012         if (argc != 1) {
4013                 usage("eventscript");
4014         }
4015
4016         if (strcmp(argv[0], "monitor") != 0) {
4017                 fprintf(stderr, "Only monitor event can be run\n");
4018                 return 1;
4019         }
4020
4021         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
4022                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4023         if (ret != 0) {
4024                 fprintf(stderr, "Failed to run monitor event on node %u\n",
4025                         ctdb->cmd_pnn);
4026                 return ret;
4027         }
4028
4029         return 0;
4030 }
4031
4032 #define DB_VERSION      3
4033 #define MAX_DB_NAME     64
4034 #define MAX_REC_BUFFER_SIZE     (100*1000)
4035
4036 struct db_header {
4037         unsigned long version;
4038         time_t timestamp;
4039         unsigned long flags;
4040         unsigned long nbuf;
4041         unsigned long nrec;
4042         char name[MAX_DB_NAME];
4043 };
4044
4045 struct backup_state {
4046         TALLOC_CTX *mem_ctx;
4047         struct ctdb_rec_buffer *recbuf;
4048         uint32_t db_id;
4049         int fd;
4050         unsigned int nbuf, nrec;
4051 };
4052
4053 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4054                           TDB_DATA key, TDB_DATA data, void *private_data)
4055 {
4056         struct backup_state *state = (struct backup_state *)private_data;
4057         size_t len;
4058         int ret;
4059
4060         if (state->recbuf == NULL) {
4061                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4062                                                      state->db_id);
4063                 if (state->recbuf == NULL) {
4064                         return ENOMEM;
4065                 }
4066         }
4067
4068         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4069                                   header, key, data);
4070         if (ret != 0) {
4071                 return ret;
4072         }
4073
4074         len = ctdb_rec_buffer_len(state->recbuf);
4075         if (len < MAX_REC_BUFFER_SIZE) {
4076                 return 0;
4077         }
4078
4079         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4080         if (ret != 0) {
4081                 fprintf(stderr, "Failed to write records to backup file\n");
4082                 return ret;
4083         }
4084
4085         state->nbuf += 1;
4086         state->nrec += state->recbuf->count;
4087         TALLOC_FREE(state->recbuf);
4088
4089         return 0;
4090 }
4091
4092 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4093                             int argc, const char **argv)
4094 {
4095         const char *db_name;
4096         struct ctdb_db_context *db;
4097         uint32_t db_id;
4098         uint8_t db_flags;
4099         struct backup_state state;
4100         struct db_header db_hdr;
4101         int fd, ret;
4102
4103         if (argc != 2) {
4104                 usage("backupdb");
4105         }
4106
4107         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4108                 return 1;
4109         }
4110
4111         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4112                           db_flags, &db);
4113         if (ret != 0) {
4114                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4115                 return ret;
4116         }
4117
4118         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4119         if (fd == -1) {
4120                 ret = errno;
4121                 fprintf(stderr, "Failed to open file %s for writing\n",
4122                         argv[1]);
4123                 return ret;
4124         }
4125
4126         /* Write empty header first */
4127         ZERO_STRUCT(db_hdr);
4128         ret = write(fd, &db_hdr, sizeof(struct db_header));
4129         if (ret == -1) {
4130                 ret = errno;
4131                 close(fd);
4132                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4133                 return ret;
4134         }
4135
4136         state.mem_ctx = mem_ctx;
4137         state.recbuf = NULL;
4138         state.fd = fd;
4139         state.nbuf = 0;
4140         state.nrec = 0;
4141
4142         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4143         if (ret != 0) {
4144                 fprintf(stderr, "Failed to collect records from DB %s\n",
4145                         db_name);
4146                 close(fd);
4147                 return ret;
4148         }
4149
4150         if (state.recbuf != NULL) {
4151                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4152                 if (ret != 0) {
4153                         fprintf(stderr,
4154                                 "Failed to write records to backup file\n");
4155                         close(fd);
4156                         return ret;
4157                 }
4158
4159                 state.nbuf += 1;
4160                 state.nrec += state.recbuf->count;
4161                 TALLOC_FREE(state.recbuf);
4162         }
4163
4164         db_hdr.version = DB_VERSION;
4165         db_hdr.timestamp = time(NULL);
4166         db_hdr.flags = db_flags;
4167         db_hdr.nbuf = state.nbuf;
4168         db_hdr.nrec = state.nrec;
4169         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4170
4171         lseek(fd, 0, SEEK_SET);
4172         ret = write(fd, &db_hdr, sizeof(struct db_header));
4173         if (ret == -1) {
4174                 ret = errno;
4175                 close(fd);
4176                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4177                 return ret;
4178         }
4179
4180         close(fd);
4181         printf("Database backed up to %s\n", argv[1]);
4182         return 0;
4183 }
4184
4185 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4186                              int argc, const char **argv)
4187 {
4188         const char *db_name = NULL;
4189         struct ctdb_db_context *db;
4190         struct db_header db_hdr;
4191         struct ctdb_node_map *nodemap;
4192         struct ctdb_req_control request;
4193         struct ctdb_reply_control **reply;
4194         struct ctdb_transdb wipedb;
4195         struct ctdb_pulldb_ext pulldb;
4196         struct ctdb_rec_buffer *recbuf;
4197         uint32_t generation;
4198         uint32_t *pnn_list;
4199         char timebuf[128];
4200         int fd, i;
4201         int count, ret;
4202
4203         if (argc < 1 || argc > 2) {
4204                 usage("restoredb");
4205         }
4206
4207         fd = open(argv[0], O_RDONLY, 0600);
4208         if (fd == -1) {
4209                 ret = errno;
4210                 fprintf(stderr, "Failed to open file %s for reading\n",
4211                         argv[0]);
4212                 return ret;
4213         }
4214
4215         if (argc == 2) {
4216                 db_name = argv[1];
4217         }
4218
4219         ret = read(fd, &db_hdr, sizeof(struct db_header));
4220         if (ret == -1) {
4221                 ret = errno;
4222                 close(fd);
4223                 fprintf(stderr, "Failed to read db header from file %s\n",
4224                         argv[0]);
4225                 return ret;
4226         }
4227
4228         if (db_hdr.version != DB_VERSION) {
4229                 fprintf(stderr,
4230                         "Wrong version of backup file, expected %u, got %lu\n",
4231                         DB_VERSION, db_hdr.version);
4232                 close(fd);
4233                 return EINVAL;
4234         }
4235
4236         if (db_name == NULL) {
4237                 db_name = db_hdr.name;
4238         }
4239
4240         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4241                  localtime(&db_hdr.timestamp));
4242         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4243
4244         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4245                           db_hdr.flags, &db);
4246         if (ret != 0) {
4247                 fprintf(stderr, "Failed to attach to DB %s\n", db_hdr.name);
4248                 return ret;
4249         }
4250
4251         nodemap = get_nodemap(ctdb, false);
4252         if (nodemap == NULL) {
4253                 fprintf(stderr, "Failed to get nodemap\n");
4254                 close(fd);
4255                 return ENOMEM;
4256         }
4257
4258         ret = get_generation(mem_ctx, ctdb, &generation);
4259         if (ret != 0) {
4260                 fprintf(stderr, "Failed to get current generation\n");
4261                 close(fd);
4262                 return ret;
4263         }
4264
4265         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4266                                      &pnn_list);
4267         if (count <= 0) {
4268                 close(fd);
4269                 return ENOMEM;
4270         }
4271
4272         wipedb.db_id = ctdb_db_id(db);
4273         wipedb.tid = generation;
4274
4275         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4276         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4277                                         ctdb->client, pnn_list, count,
4278                                         TIMEOUT(), &request, NULL, NULL);
4279         if (ret != 0) {
4280                 goto failed;
4281         }
4282
4283
4284         ctdb_req_control_db_transaction_start(&request, &wipedb);
4285         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4286                                         pnn_list, count, TIMEOUT(),
4287                                         &request, NULL, NULL);
4288         if (ret != 0) {
4289                 goto failed;
4290         }
4291
4292         ctdb_req_control_wipe_database(&request, &wipedb);
4293         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4294                                         pnn_list, count, TIMEOUT(),
4295                                         &request, NULL, NULL);
4296         if (ret != 0) {
4297                 goto failed;
4298         }
4299
4300         pulldb.db_id = ctdb_db_id(db);
4301         pulldb.lmaster = 0;
4302         pulldb.srvid = SRVID_CTDB_PUSHDB;
4303
4304         ctdb_req_control_db_push_start(&request, &pulldb);
4305         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4306                                         pnn_list, count, TIMEOUT(),
4307                                         &request, NULL, NULL);
4308         if (ret != 0) {
4309                 goto failed;
4310         }
4311
4312         for (i=0; i<db_hdr.nbuf; i++) {
4313                 struct ctdb_req_message message;
4314                 TDB_DATA data;
4315
4316                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4317                 if (ret != 0) {
4318                         goto failed;
4319                 }
4320
4321                 data.dsize = ctdb_rec_buffer_len(recbuf);
4322                 data.dptr = talloc_size(mem_ctx, data.dsize);
4323                 if (data.dptr == NULL) {
4324                         goto failed;
4325                 }
4326
4327                 ctdb_rec_buffer_push(recbuf, data.dptr);
4328
4329                 message.srvid = pulldb.srvid;
4330                 message.data.data = data;
4331
4332                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4333                                                 ctdb->client,
4334                                                 pnn_list, count,
4335                                                 &message, NULL);
4336                 if (ret != 0) {
4337                         goto failed;
4338                 }
4339
4340                 talloc_free(recbuf);
4341                 talloc_free(data.dptr);
4342         }
4343
4344         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4345         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4346                                         pnn_list, count, TIMEOUT(),
4347                                         &request, NULL, &reply);
4348         if (ret != 0) {
4349                 goto failed;
4350         }
4351
4352         for (i=0; i<count; i++) {
4353                 uint32_t num_records;
4354
4355                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4356                                                          &num_records);
4357                 if (ret != 0) {
4358                         fprintf(stderr, "Invalid response from node %u\n",
4359                                 pnn_list[i]);
4360                         goto failed;
4361                 }
4362
4363                 if (num_records != db_hdr.nrec) {
4364                         fprintf(stderr, "Node %u received %u of %lu records\n",
4365                                 pnn_list[i], num_records, db_hdr.nrec);
4366                         goto failed;
4367                 }
4368         }
4369
4370         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4371         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4372                                         pnn_list, count, TIMEOUT(),
4373                                         &request, NULL, NULL);
4374         if (ret != 0) {
4375                 goto failed;
4376         }
4377
4378         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4379         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4380                                         pnn_list, count, TIMEOUT(),
4381                                         &request, NULL, NULL);
4382         if (ret != 0) {
4383                 goto failed;
4384         }
4385
4386         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4387         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4388                                         ctdb->client, pnn_list, count,
4389                                         TIMEOUT(), &request, NULL, NULL);
4390         if (ret != 0) {
4391                 goto failed;
4392         }
4393
4394         printf("Database %s restored\n", db_name);
4395         return 0;
4396
4397
4398 failed:
4399         close(fd);
4400         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4401                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4402         return ret;
4403 }
4404
4405 struct dumpdbbackup_state {
4406         ctdb_rec_parser_func_t parser;
4407         struct dump_record_state sub_state;
4408 };
4409
4410 static int dumpdbbackup_handler(uint32_t reqid,
4411                                 struct ctdb_ltdb_header *header,
4412                                 TDB_DATA key, TDB_DATA data,
4413                                 void *private_data)
4414 {
4415         struct dumpdbbackup_state *state =
4416                 (struct dumpdbbackup_state *)private_data;
4417         struct ctdb_ltdb_header hdr;
4418         int ret;
4419
4420         ret = ctdb_ltdb_header_extract(&data, &hdr);
4421         if (ret != 0) {
4422                 return ret;
4423         }
4424
4425         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4426 }
4427
4428 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4429                                 int argc, const char **argv)
4430 {
4431         struct db_header db_hdr;
4432         char timebuf[128];
4433         struct dumpdbbackup_state state;
4434         int fd, ret, i;
4435
4436         if (argc != 1) {
4437                 usage("dumpbackup");
4438         }
4439
4440         fd = open(argv[0], O_RDONLY, 0600);
4441         if (fd == -1) {
4442                 ret = errno;
4443                 fprintf(stderr, "Failed to open file %s for reading\n",
4444                         argv[0]);
4445                 return ret;
4446         }
4447
4448         ret = read(fd, &db_hdr, sizeof(struct db_header));
4449         if (ret == -1) {
4450                 ret = errno;
4451                 close(fd);
4452                 fprintf(stderr, "Failed to read db header from file %s\n",
4453                         argv[0]);
4454                 return ret;
4455         }
4456
4457         if (db_hdr.version != DB_VERSION) {
4458                 fprintf(stderr,
4459                         "Wrong version of backup file, expected %u, got %lu\n",
4460                         DB_VERSION, db_hdr.version);
4461                 close(fd);
4462                 return EINVAL;
4463         }
4464
4465         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4466                  localtime(&db_hdr.timestamp));
4467         printf("Dumping database %s from backup @ %s\n",
4468                db_hdr.name, timebuf);
4469
4470         state.parser = dump_record;
4471         state.sub_state.count = 0;
4472
4473         for (i=0; i<db_hdr.nbuf; i++) {
4474                 struct ctdb_rec_buffer *recbuf;
4475
4476                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4477                 if (ret != 0) {
4478                         fprintf(stderr, "Failed to read records\n");
4479                         close(fd);
4480                         return ret;
4481                 }
4482
4483                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4484                                                &state);
4485                 if (ret != 0) {
4486                         fprintf(stderr, "Failed to dump records\n");
4487                         close(fd);
4488                         return ret;
4489                 }
4490         }
4491
4492         close(fd);
4493         printf("Dumped %u record(s)\n", state.sub_state.count);
4494         return 0;
4495 }
4496
4497 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4498                           int argc, const char **argv)
4499 {
4500         const char *db_name;
4501         struct ctdb_db_context *db;
4502         uint32_t db_id;
4503         uint8_t db_flags;
4504         struct ctdb_node_map *nodemap;
4505         struct ctdb_req_control request;
4506         struct ctdb_transdb wipedb;
4507         uint32_t generation;
4508         uint32_t *pnn_list;
4509         int count, ret;
4510
4511         if (argc != 1) {
4512                 usage("wipedb");
4513         }
4514
4515         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4516                 return 1;
4517         }
4518
4519         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4520                           db_flags, &db);
4521         if (ret != 0) {
4522                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4523                 return ret;
4524         }
4525
4526         nodemap = get_nodemap(ctdb, false);
4527         if (nodemap == NULL) {
4528                 fprintf(stderr, "Failed to get nodemap\n");
4529                 return ENOMEM;
4530         }
4531
4532         ret = get_generation(mem_ctx, ctdb, &generation);
4533         if (ret != 0) {
4534                 fprintf(stderr, "Failed to get current generation\n");
4535                 return ret;
4536         }
4537
4538         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4539                                      &pnn_list);
4540         if (count <= 0) {
4541                 return ENOMEM;
4542         }
4543
4544         ctdb_req_control_db_freeze(&request, db_id);
4545         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4546                                         ctdb->client, pnn_list, count,
4547                                         TIMEOUT(), &request, NULL, NULL);
4548         if (ret != 0) {
4549                 goto failed;
4550         }
4551
4552         wipedb.db_id = db_id;
4553         wipedb.tid = generation;
4554
4555         ctdb_req_control_db_transaction_start(&request, &wipedb);
4556         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4557                                         pnn_list, count, TIMEOUT(),
4558                                         &request, NULL, NULL);
4559         if (ret != 0) {
4560                 goto failed;
4561         }
4562
4563         ctdb_req_control_wipe_database(&request, &wipedb);
4564         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4565                                         pnn_list, count, TIMEOUT(),
4566                                         &request, NULL, NULL);
4567         if (ret != 0) {
4568                 goto failed;
4569         }
4570
4571         ctdb_req_control_db_set_healthy(&request, db_id);
4572         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4573                                         pnn_list, count, TIMEOUT(),
4574                                         &request, NULL, NULL);
4575         if (ret != 0) {
4576                 goto failed;
4577         }
4578
4579         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4580         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4581                                         pnn_list, count, TIMEOUT(),
4582                                         &request, NULL, NULL);
4583         if (ret != 0) {
4584                 goto failed;
4585         }
4586
4587         ctdb_req_control_db_thaw(&request, db_id);
4588         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4589                                         ctdb->client, pnn_list, count,
4590                                         TIMEOUT(), &request, NULL, NULL);
4591         if (ret != 0) {
4592                 goto failed;
4593         }
4594
4595         printf("Database %s wiped\n", db_name);
4596         return 0;
4597
4598
4599 failed:
4600         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4601                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4602         return ret;
4603 }
4604
4605 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4606                              int argc, const char **argv)
4607 {
4608         uint32_t recmaster;
4609         int ret;
4610
4611         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4612                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4613         if (ret != 0) {
4614                 return ret;
4615         }
4616
4617         printf("%u\n", recmaster);
4618         return 0;
4619 }
4620
4621 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4622                                    const char *event_str)
4623 {
4624         int i;
4625         int num_run = 0;
4626
4627         if (slist == NULL) {
4628                 if (! options.machinereadable) {
4629                         printf("%s cycle never run\n", event_str);
4630                 }
4631                 return;
4632         }
4633
4634         for (i=0; i<slist->num_scripts; i++) {
4635                 if (slist->script[i].status != -ENOEXEC) {
4636                         num_run++;
4637                 }
4638         }
4639
4640         if (! options.machinereadable) {
4641                 printf("%d scripts were executed last %s cycle\n",
4642                        num_run, event_str);
4643         }
4644
4645         for (i=0; i<slist->num_scripts; i++) {
4646                 const char *status = NULL;
4647
4648                 switch (slist->script[i].status) {
4649                 case -ETIME:
4650                         status = "TIMEDOUT";
4651                         break;
4652                 case -ENOEXEC:
4653                         status = "DISABLED";
4654                         break;
4655                 case 0:
4656                         status = "OK";
4657                         break;
4658                 default:
4659                         if (slist->script[i].status > 0) {
4660                                 status = "ERROR";
4661                         }
4662                         break;
4663                 }
4664
4665                 if (options.machinereadable) {
4666                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4667                                options.sep,
4668                                event_str, options.sep,
4669                                slist->script[i].name, options.sep,
4670                                slist->script[i].status, options.sep,
4671                                status, options.sep,
4672                                slist->script[i].start.tv_sec,
4673                                slist->script[i].start.tv_usec, options.sep,
4674                                slist->script[i].finished.tv_sec,
4675                                slist->script[i].finished.tv_usec, options.sep,
4676                                slist->script[i].output, options.sep);
4677                         continue;
4678                 }
4679
4680                 if (status) {
4681                         printf("%-20s Status:%s    ",
4682                                slist->script[i].name, status);
4683                 } else {
4684                         /* Some other error, eg from stat. */
4685                         printf("%-20s Status:CANNOT RUN (%s)",
4686                                slist->script[i].name,
4687                                strerror(-slist->script[i].status));
4688                 }
4689
4690                 if (slist->script[i].status >= 0) {
4691                         printf("Duration:%.3lf ",
4692                                timeval_delta(&slist->script[i].finished,
4693                                              &slist->script[i].start));
4694                 }
4695                 if (slist->script[i].status != -ENOEXEC) {
4696                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4697                         if (slist->script[i].status != 0) {
4698                                 printf("   OUTPUT:%s\n",
4699                                        slist->script[i].output);
4700                         }
4701                 } else {
4702                         printf("\n");
4703                 }
4704         }
4705 }
4706
4707 static void print_scriptstatus(struct ctdb_script_list **slist,
4708                                int count, const char **event_str)
4709 {
4710         int i;
4711
4712         if (options.machinereadable) {
4713                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4714                        options.sep,
4715                        "Type", options.sep,
4716                        "Name", options.sep,
4717                        "Code", options.sep,
4718                        "Status", options.sep,
4719                        "Start", options.sep,
4720                        "End", options.sep,
4721                        "Error Output", options.sep);
4722         }
4723
4724         for (i=0; i<count; i++) {
4725                 print_scriptstatus_one(slist[i], event_str[i]);
4726         }
4727 }
4728
4729 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4730                                 int argc, const char **argv)
4731 {
4732         struct ctdb_script_list **slist;
4733         const char *event_str;
4734         enum ctdb_event event;
4735         const char *all_events[] = {
4736                 "init", "setup", "startup", "monitor",
4737                 "takeip", "releaseip", "updateip", "ipreallocated" };
4738         bool valid;
4739         int ret, i, j;
4740         int count, start, end, num;
4741
4742         if (argc > 1) {
4743                 usage("scriptstatus");
4744         }
4745
4746         if (argc == 0) {
4747                 event_str = "monitor";
4748         } else {
4749                 event_str = argv[0];
4750         }
4751
4752         valid = false;
4753         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4754                 if (strcmp(event_str, all_events[i]) == 0) {
4755                         valid = true;
4756                         count = 1;
4757                         start = i;
4758                         end = i+1;
4759                         break;
4760                 }
4761         }
4762
4763         if (strcmp(event_str, "all") == 0) {
4764                 valid = true;
4765                 count = ARRAY_SIZE(all_events);
4766                 start = 0;
4767                 end = count-1;
4768         }
4769
4770         if (! valid) {
4771                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4772                 usage("scriptstatus");
4773         }
4774
4775         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4776         if (slist == NULL) {
4777                 fprintf(stderr, "Memory allocation error\n");
4778                 return 1;
4779         }
4780
4781         num = 0;
4782         for (i=start; i<end; i++) {
4783                 event = ctdb_event_from_string(all_events[i]);
4784
4785                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4786                                                         ctdb->client,
4787                                                         ctdb->cmd_pnn,
4788                                                         TIMEOUT(), event,
4789                                                         &slist[num]);
4790                 if (ret != 0) {
4791                         fprintf(stderr,
4792                                 "failed to get script status for %s event\n",
4793                                 all_events[i]);
4794                         return 1;
4795                 }
4796
4797                 if (slist[num] == NULL) {
4798                         num++;
4799                         continue;
4800                 }
4801
4802                 /* The ETIME status is ignored for certain events.
4803                  * In that case the status is 0, but endtime is not set.
4804                  */
4805                 for (j=0; j<slist[num]->num_scripts; j++) {
4806                         if (slist[num]->script[j].status == 0 &&
4807                             timeval_is_zero(&slist[num]->script[j].finished)) {
4808                                 slist[num]->script[j].status = -ETIME;
4809                         }
4810                 }
4811
4812                 num++;
4813         }
4814
4815         print_scriptstatus(slist, count, &all_events[start]);
4816         return 0;
4817 }
4818
4819 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4820                                 int argc, const char **argv)
4821 {
4822         int ret;
4823
4824         if (argc != 1) {
4825                 usage("enablescript");
4826         }
4827
4828         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4829                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4830         if (ret != 0) {
4831                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4832                 return ret;
4833         }
4834
4835         return 0;
4836 }
4837
4838 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4839                                  int argc, const char **argv)
4840 {
4841         int ret;
4842
4843         if (argc != 1) {
4844                 usage("disablescript");
4845         }
4846
4847         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4848                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4849         if (ret != 0) {
4850                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4851                 return ret;
4852         }
4853
4854         return 0;
4855 }
4856
4857 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4858                          int argc, const char **argv)
4859 {
4860         char *t, *natgw_helper = NULL;
4861
4862         if (argc != 1) {
4863                 usage("natgw");
4864         }
4865
4866         t = getenv("CTDB_NATGW_HELPER");
4867         if (t != NULL) {
4868                 natgw_helper = talloc_strdup(mem_ctx, t);
4869         } else {
4870                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4871                                                CTDB_HELPER_BINDIR);
4872         }
4873
4874         if (natgw_helper == NULL) {
4875                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4876                 return 1;
4877         }
4878
4879         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4880 }
4881
4882 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4883                              int argc, const char **argv)
4884 {
4885         char *t, *natgw_helper = NULL;
4886
4887         if (argc != 0) {
4888                 usage("natgwlist");
4889         }
4890
4891         t = getenv("CTDB_NATGW_HELPER");
4892         if (t != NULL) {
4893                 natgw_helper = talloc_strdup(mem_ctx, t);
4894         } else {
4895                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4896                                                CTDB_HELPER_BINDIR);
4897         }
4898
4899         if (natgw_helper == NULL) {
4900                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4901                 return 1;
4902         }
4903
4904         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4905 }
4906
4907 /*
4908  * Find the PNN of the current node
4909  * discover the pnn by loading the nodes file and try to bind
4910  * to all addresses one at a time until the ip address is found.
4911  */
4912 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4913 {
4914         struct ctdb_node_map *nodemap;
4915         int i;
4916
4917         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4918         if (nodemap == NULL) {
4919                 return false;
4920         }
4921
4922         for (i=0; i<nodemap->num; i++) {
4923                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4924                         continue;
4925                 }
4926                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4927                         if (pnn != NULL) {
4928                                 *pnn = nodemap->node[i].pnn;
4929                         }
4930                         talloc_free(nodemap);
4931                         return true;
4932                 }
4933         }
4934
4935         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4936         talloc_free(nodemap);
4937         return false;
4938 }
4939
4940 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4941                               int argc, const char **argv)
4942 {
4943         const char *reclock;
4944         int ret;
4945
4946         if (argc != 0) {
4947                 usage("getreclock");
4948         }
4949
4950         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4951                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4952         if (ret != 0) {
4953                 return ret;
4954         }
4955
4956         if (reclock != NULL) {
4957                 printf("%s\n", reclock);
4958         }
4959
4960         return 0;
4961 }
4962
4963 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4964                                   struct ctdb_context *ctdb,
4965                                   int argc, const char **argv)
4966 {
4967         uint32_t lmasterrole = 0;
4968         int ret;
4969
4970         if (argc != 1) {
4971                 usage("setlmasterrole");
4972         }
4973
4974         if (strcmp(argv[0], "on") == 0) {
4975                 lmasterrole = 1;
4976         } else if (strcmp(argv[0], "off") == 0) {
4977                 lmasterrole = 0;
4978         } else {
4979                 usage("setlmasterrole");
4980         }
4981
4982         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4983                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4984         if (ret != 0) {
4985                 return ret;
4986         }
4987
4988         return 0;
4989 }
4990
4991 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4992                                     struct ctdb_context *ctdb,
4993                                     int argc, const char **argv)
4994 {
4995         uint32_t recmasterrole = 0;
4996         int ret;
4997
4998         if (argc != 1) {
4999                 usage("setrecmasterrole");
5000         }
5001
5002         if (strcmp(argv[0], "on") == 0) {
5003                 recmasterrole = 1;
5004         } else if (strcmp(argv[0], "off") == 0) {
5005                 recmasterrole = 0;
5006         } else {
5007                 usage("setrecmasterrole");
5008         }
5009
5010         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
5011                                           ctdb->cmd_pnn, TIMEOUT(),
5012                                           recmasterrole);
5013         if (ret != 0) {
5014                 return ret;
5015         }
5016
5017         return 0;
5018 }
5019
5020 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
5021                                  struct ctdb_context *ctdb,
5022                                  int argc, const char **argv)
5023 {
5024         uint32_t db_id;
5025         uint8_t db_flags;
5026         int ret;
5027
5028         if (argc != 1) {
5029                 usage("setdbreadonly");
5030         }
5031
5032         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5033                 return 1;
5034         }
5035
5036         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5037                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
5038                 return 1;
5039         }
5040
5041         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5042                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5043         if (ret != 0) {
5044                 return ret;
5045         }
5046
5047         return 0;
5048 }
5049
5050 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5051                                int argc, const char **argv)
5052 {
5053         uint32_t db_id;
5054         uint8_t db_flags;
5055         int ret;
5056
5057         if (argc != 1) {
5058                 usage("setdbsticky");
5059         }
5060
5061         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5062                 return 1;
5063         }
5064
5065         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5066                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5067                 return 1;
5068         }
5069
5070         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5071                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5072         if (ret != 0) {
5073                 return ret;
5074         }
5075
5076         return 0;
5077 }
5078
5079 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5080                           int argc, const char **argv)
5081 {
5082         const char *db_name;
5083         struct ctdb_db_context *db;
5084         struct ctdb_transaction_handle *h;
5085         uint8_t db_flags;
5086         TDB_DATA key, data;
5087         int ret;
5088
5089         if (argc < 2 || argc > 3) {
5090                 usage("pfetch");
5091         }
5092
5093         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5094                 return 1;
5095         }
5096
5097         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5098                 fprintf(stderr, "DB %s is not a persistent database\n",
5099                         db_name);
5100                 return 1;
5101         }
5102
5103         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5104                           db_flags, &db);
5105         if (ret != 0) {
5106                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5107                 return ret;
5108         }
5109
5110         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5111         if (ret != 0) {
5112                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5113                 return ret;
5114         }
5115
5116         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5117                                      TIMEOUT(), db, true, &h);
5118         if (ret != 0) {
5119                 fprintf(stderr, "Failed to start transaction on db %s\n",
5120                         db_name);
5121                 return ret;
5122         }
5123
5124         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5125         if (ret != 0) {
5126                 fprintf(stderr, "Failed to read record for key %s\n",
5127                         argv[1]);
5128                 return ret;
5129         }
5130
5131         printf("%.*s\n", (int)data.dsize, data.dptr);
5132
5133         ctdb_transaction_cancel(h);
5134         return 0;
5135 }
5136
5137 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5138                           int argc, const char **argv)
5139 {
5140         const char *db_name;
5141         struct ctdb_db_context *db;
5142         struct ctdb_transaction_handle *h;
5143         uint8_t db_flags;
5144         TDB_DATA key, data;
5145         int ret;
5146
5147         if (argc != 3) {
5148                 usage("pstore");
5149         }
5150
5151         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5152                 return 1;
5153         }
5154
5155         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5156                 fprintf(stderr, "DB %s is not a persistent database\n",
5157                         db_name);
5158                 return 1;
5159         }
5160
5161         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5162                           db_flags, &db);
5163         if (ret != 0) {
5164                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5165                 return ret;
5166         }
5167
5168         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5169         if (ret != 0) {
5170                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5171                 return ret;
5172         }
5173
5174         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5175         if (ret != 0) {
5176                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5177                 return ret;
5178         }
5179
5180         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5181                                      TIMEOUT(), db, false, &h);
5182         if (ret != 0) {
5183                 fprintf(stderr, "Failed to start transaction on db %s\n",
5184                         db_name);
5185                 return ret;
5186         }
5187
5188         ret = ctdb_transaction_store_record(h, key, data);
5189         if (ret != 0) {
5190                 fprintf(stderr, "Failed to store record for key %s\n",
5191                         argv[1]);
5192                 return ret;
5193         }
5194
5195         ret = ctdb_transaction_commit(h);
5196         if (ret != 0) {
5197                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5198                         db_name);
5199                 return ret;
5200         }
5201
5202         return 0;
5203 }
5204
5205 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5206                            int argc, const char **argv)
5207 {
5208         const char *db_name;
5209         struct ctdb_db_context *db;
5210         struct ctdb_transaction_handle *h;
5211         uint8_t db_flags;
5212         TDB_DATA key;
5213         int ret;
5214
5215         if (argc != 2) {
5216                 usage("pdelete");
5217         }
5218
5219         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5220                 return 1;
5221         }
5222
5223         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5224                 fprintf(stderr, "DB %s is not a persistent database\n",
5225                         db_name);
5226                 return 1;
5227         }
5228
5229         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5230                           db_flags, &db);
5231         if (ret != 0) {
5232                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5233                 return ret;
5234         }
5235
5236         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5237         if (ret != 0) {
5238                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5239                 return ret;
5240         }
5241
5242         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5243                                      TIMEOUT(), db, false, &h);
5244         if (ret != 0) {
5245                 fprintf(stderr, "Failed to start transaction on db %s\n",
5246                         db_name);
5247                 return ret;
5248         }
5249
5250         ret = ctdb_transaction_delete_record(h, key);
5251         if (ret != 0) {
5252                 fprintf(stderr, "Failed to delete record for key %s\n",
5253                         argv[1]);
5254                 return ret;
5255         }
5256
5257         ret = ctdb_transaction_commit(h);
5258         if (ret != 0) {
5259                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5260                         db_name);
5261                 return ret;
5262         }
5263
5264         return 0;
5265 }
5266
5267 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5268 {
5269         const char *t;
5270         size_t n;
5271         int ret;
5272
5273         *data = tdb_null;
5274
5275         /* Skip whitespace */
5276         n = strspn(*ptr, " \t");
5277         t = *ptr + n;
5278
5279         if (t[0] == '"') {
5280                 /* Quoted ASCII string - no wide characters! */
5281                 t++;
5282                 n = strcspn(t, "\"");
5283                 if (t[n] == '"') {
5284                         if (n > 0) {
5285                                 ret = str_to_data(t, n, mem_ctx, data);
5286                                 if (ret != 0) {
5287                                         return ret;
5288                                 }
5289                         }
5290                         *ptr = t + n + 1;
5291                 } else {
5292                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5293                         return 1;
5294                 }
5295         } else {
5296                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5297                 return 1;
5298         }
5299
5300         return 0;
5301 }
5302
5303 #define MAX_LINE_SIZE   1024
5304
5305 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5306                                  TDB_DATA *key, TDB_DATA *value)
5307 {
5308         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5309         const char *ptr;
5310         int ret;
5311
5312         ptr = fgets(line, MAX_LINE_SIZE, file);
5313         if (ptr == NULL) {
5314                 return false;
5315         }
5316
5317         /* Get key */
5318         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5319         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5320                 /* Line Ignored but not EOF */
5321                 *key = tdb_null;
5322                 return true;
5323         }
5324
5325         /* Get value */
5326         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5327         if (ret != 0) {
5328                 /* Line Ignored but not EOF */
5329                 talloc_free(key->dptr);
5330                 *key = tdb_null;
5331                 return true;
5332         }
5333
5334         return true;
5335 }
5336
5337 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5338                           int argc, const char **argv)
5339 {
5340         const char *db_name;
5341         struct ctdb_db_context *db;
5342         struct ctdb_transaction_handle *h;
5343         uint8_t db_flags;
5344         FILE *file;
5345         TDB_DATA key, value;
5346         int ret;
5347
5348         if (argc < 1 || argc > 2) {
5349                 usage("ptrans");
5350         }
5351
5352         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5353                 return 1;
5354         }
5355
5356         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5357                 fprintf(stderr, "DB %s is not a persistent database\n",
5358                         db_name);
5359                 return 1;
5360         }
5361
5362         if (argc == 2) {
5363                 file = fopen(argv[1], "r");
5364                 if (file == NULL) {
5365                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5366                         return 1;
5367                 }
5368         } else {
5369                 file = stdin;
5370         }
5371
5372         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5373                           db_flags, &db);
5374         if (ret != 0) {
5375                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5376                 goto done;
5377         }
5378
5379         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5380                                      TIMEOUT(), db, false, &h);
5381         if (ret != 0) {
5382                 fprintf(stderr, "Failed to start transaction on db %s\n",
5383                         db_name);
5384                 goto done;
5385         }
5386
5387         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5388                 if (key.dsize != 0) {
5389                         ret = ctdb_transaction_store_record(h, key, value);
5390                         if (ret != 0) {
5391                                 fprintf(stderr, "Failed to store record\n");
5392                                 ctdb_transaction_cancel(h);
5393                                 goto done;
5394                         }
5395                         talloc_free(key.dptr);
5396                         talloc_free(value.dptr);
5397                 }
5398         }
5399
5400         ret = ctdb_transaction_commit(h);
5401         if (ret != 0) {
5402                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5403                         db_name);
5404         }
5405
5406 done:
5407         if (file != stdin) {
5408                 fclose(file);
5409         }
5410         return ret;
5411 }
5412
5413 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5414                           int argc, const char **argv)
5415 {
5416         struct tdb_context *tdb;
5417         TDB_DATA key, data;
5418         struct ctdb_ltdb_header header;
5419         int ret;
5420
5421         if (argc < 2 || argc > 3) {
5422                 usage("tfetch");
5423         }
5424
5425         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5426         if (tdb == NULL) {
5427                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5428                 return 1;
5429         }
5430
5431         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5432         if (ret != 0) {
5433                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5434                 return ret;
5435         }
5436
5437         data = tdb_fetch(tdb, key);
5438         if (data.dptr == NULL) {
5439                 fprintf(stderr, "No record for key %s\n", argv[1]);
5440                 return 1;
5441         }
5442
5443         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5444                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5445                 return 1;
5446         }
5447
5448         tdb_close(tdb);
5449
5450         if (argc == 3) {
5451                 int fd;
5452                 ssize_t nwritten;
5453
5454                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5455                 if (fd == -1) {
5456                         fprintf(stderr, "Failed to open output file %s\n",
5457                                 argv[2]);
5458                         goto fail;
5459                 }
5460
5461                 nwritten = sys_write(fd, data.dptr, data.dsize);
5462                 if (nwritten != data.dsize) {
5463                         fprintf(stderr, "Failed to write record to file\n");
5464                         close(fd);
5465                         goto fail;
5466                 }
5467
5468                 close(fd);
5469         }
5470
5471 fail:
5472         ret = ctdb_ltdb_header_extract(&data, &header);
5473         if (ret != 0) {
5474                 fprintf(stderr, "Failed to parse header from data\n");
5475                 return 1;
5476         }
5477
5478         dump_ltdb_header(&header);
5479         dump_tdb_data("data", data);
5480
5481         return 0;
5482 }
5483
5484 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5485                           int argc, const char **argv)
5486 {
5487         struct tdb_context *tdb;
5488         TDB_DATA key, data, value;
5489         struct ctdb_ltdb_header header;
5490         size_t offset;
5491         int ret;
5492
5493         if (argc < 3 || argc > 5) {
5494                 usage("tstore");
5495         }
5496
5497         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5498         if (tdb == NULL) {
5499                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5500                 return 1;
5501         }
5502
5503         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5504         if (ret != 0) {
5505                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5506                 return ret;
5507         }
5508
5509         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5510         if (ret != 0) {
5511                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5512                 return ret;
5513         }
5514
5515         ZERO_STRUCT(header);
5516
5517         if (argc > 3) {
5518                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5519         }
5520         if (argc > 4) {
5521                 header.dmaster = (uint32_t)atol(argv[4]);
5522         }
5523         if (argc > 5) {
5524                 header.flags = (uint32_t)atol(argv[5]);
5525         }
5526
5527         offset = ctdb_ltdb_header_len(&header);
5528         data.dsize = offset + value.dsize;
5529         data.dptr = talloc_size(mem_ctx, data.dsize);
5530         if (data.dptr == NULL) {
5531                 fprintf(stderr, "Memory allocation error\n");
5532                 return 1;
5533         }
5534
5535         ctdb_ltdb_header_push(&header, data.dptr);
5536         memcpy(data.dptr + offset, value.dptr, value.dsize);
5537
5538         ret = tdb_store(tdb, key, data, TDB_REPLACE);
5539         if (ret != 0) {
5540                 fprintf(stderr, "Failed to write record %s to file %s\n",
5541                         argv[1], argv[0]);
5542         }
5543
5544         tdb_close(tdb);
5545
5546         return ret;
5547 }
5548
5549 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5550                            int argc, const char **argv)
5551 {
5552         const char *db_name;
5553         struct ctdb_db_context *db;
5554         struct ctdb_record_handle *h;
5555         uint8_t db_flags;
5556         TDB_DATA key, data;
5557         bool readonly = false;
5558         int ret;
5559
5560         if (argc < 2 || argc > 3) {
5561                 usage("readkey");
5562         }
5563
5564         if (argc == 3) {
5565                 if (strcmp(argv[2], "readonly") == 0) {
5566                         readonly = true;
5567                 } else {
5568                         usage("readkey");
5569                 }
5570         }
5571
5572         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5573                 return 1;
5574         }
5575
5576         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5577                 fprintf(stderr, "DB %s is not a volatile database\n",
5578                         db_name);
5579                 return 1;
5580         }
5581
5582         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5583                           db_flags, &db);
5584         if (ret != 0) {
5585                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5586                 return ret;
5587         }
5588
5589         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5590         if (ret != 0) {
5591                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5592                 return ret;
5593         }
5594
5595         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5596                               db, key, readonly, &h, NULL, &data);
5597         if (ret != 0) {
5598                 fprintf(stderr, "Failed to read record for key %s\n",
5599                         argv[1]);
5600         } else {
5601                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5602                        (int)data.dsize, data.dptr);
5603         }
5604
5605         talloc_free(h);
5606         return ret;
5607 }
5608
5609 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5610                             int argc, const char **argv)
5611 {
5612         const char *db_name;
5613         struct ctdb_db_context *db;
5614         struct ctdb_record_handle *h;
5615         uint8_t db_flags;
5616         TDB_DATA key, data;
5617         int ret;
5618
5619         if (argc != 3) {
5620                 usage("writekey");
5621         }
5622
5623         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5624                 return 1;
5625         }
5626
5627         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5628                 fprintf(stderr, "DB %s is not a volatile database\n",
5629                         db_name);
5630                 return 1;
5631         }
5632
5633         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5634                           db_flags, &db);
5635         if (ret != 0) {
5636                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5637                 return ret;
5638         }
5639
5640         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5641         if (ret != 0) {
5642                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5643                 return ret;
5644         }
5645
5646         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5647         if (ret != 0) {
5648                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5649                 return ret;
5650         }
5651
5652         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5653                               db, key, false, &h, NULL, NULL);
5654         if (ret != 0) {
5655                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5656                 return ret;
5657         }
5658
5659         ret = ctdb_store_record(h, data);
5660         if (ret != 0) {
5661                 fprintf(stderr, "Failed to store record for key %s\n",
5662                         argv[1]);
5663         }
5664
5665         talloc_free(h);
5666         return ret;
5667 }
5668
5669 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5670                              int argc, const char **argv)
5671 {
5672         const char *db_name;
5673         struct ctdb_db_context *db;
5674         struct ctdb_record_handle *h;
5675         uint8_t db_flags;
5676         TDB_DATA key, data;
5677         int ret;
5678
5679         if (argc != 2) {
5680                 usage("deletekey");
5681         }
5682
5683         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5684                 return 1;
5685         }
5686
5687         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5688                 fprintf(stderr, "DB %s is not a volatile database\n",
5689                         db_name);
5690                 return 1;
5691         }
5692
5693         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5694                           db_flags, &db);
5695         if (ret != 0) {
5696                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5697                 return ret;
5698         }
5699
5700         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5701         if (ret != 0) {
5702                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5703                 return ret;
5704         }
5705
5706         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5707                               db, key, false, &h, NULL, &data);
5708         if (ret != 0) {
5709                 fprintf(stderr, "Failed to fetch record for key %s\n",
5710                         argv[1]);
5711                 return ret;
5712         }
5713
5714         ret = ctdb_delete_record(h);
5715         if (ret != 0) {
5716                 fprintf(stderr, "Failed to delete record for key %s\n",
5717                         argv[1]);
5718         }
5719
5720         talloc_free(h);
5721         return ret;
5722 }
5723
5724 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5725                                 int argc, const char **argv)
5726 {
5727         struct sockaddr_in sin;
5728         unsigned int port;
5729         int s, v;
5730         int ret;
5731
5732         if (argc != 1) {
5733                 usage("chktcpport");
5734         }
5735
5736         port = atoi(argv[0]);
5737
5738         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5739         if (s == -1) {
5740                 fprintf(stderr, "Failed to open local socket\n");
5741                 return errno;
5742         }
5743
5744         v = fcntl(s, F_GETFL, 0);
5745         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5746                 fprintf(stderr, "Unable to set socket non-blocking\n");
5747                 close(s);
5748                 return errno;
5749         }
5750
5751         bzero(&sin, sizeof(sin));
5752         sin.sin_family = AF_INET;
5753         sin.sin_port = htons(port);
5754         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5755         close(s);
5756         if (ret == -1) {
5757                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5758                 return errno;
5759         }
5760
5761         return 0;
5762 }
5763
5764 static int control_rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5765                                  int argc, const char **argv)
5766 {
5767         if (argc != 0) {
5768                 usage("rebalancenode");
5769         }
5770
5771         if (! rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn)) {
5772                 fprintf(stderr, "Failed to rebalance IPs on node %u\n",
5773                         ctdb->cmd_pnn);
5774                 return 1;
5775         }
5776
5777         return 0;
5778 }
5779
5780 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5781                                int argc, const char **argv)
5782 {
5783         uint32_t db_id;
5784         const char *db_name;
5785         uint64_t seqnum;
5786         int ret;
5787
5788         if (argc != 1) {
5789                 usage("getdbseqnum");
5790         }
5791
5792         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5793                 return 1;
5794         }
5795
5796         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5797                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5798                                       &seqnum);
5799         if (ret != 0) {
5800                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5801                         db_name);
5802                 return ret;
5803         }
5804
5805         printf("0x%"PRIx64"\n", seqnum);
5806         return 0;
5807 }
5808
5809 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5810                               int argc, const char **argv)
5811 {
5812         const char *nodestring = NULL;
5813         struct ctdb_node_map *nodemap;
5814         int ret, i;
5815
5816         if (argc > 1) {
5817                 usage("nodestatus");
5818         }
5819
5820         if (argc == 1) {
5821                 nodestring = argv[0];
5822         }
5823
5824         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5825                 return 1;
5826         }
5827
5828         nodemap = get_nodemap(ctdb, false);
5829         if (nodemap == NULL) {
5830                 return 1;
5831         }
5832
5833         if (options.machinereadable) {
5834                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5835         } else {
5836                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5837         }
5838
5839         ret = 0;
5840         for (i=0; i<nodemap->num; i++) {
5841                 ret |= nodemap->node[i].flags;
5842         }
5843
5844         return ret;
5845 }
5846
5847 const struct {
5848         const char *name;
5849         uint32_t offset;
5850 } db_stats_fields[] = {
5851 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5852         DBSTATISTICS_FIELD(db_ro_delegations),
5853         DBSTATISTICS_FIELD(db_ro_revokes),
5854         DBSTATISTICS_FIELD(locks.num_calls),
5855         DBSTATISTICS_FIELD(locks.num_current),
5856         DBSTATISTICS_FIELD(locks.num_pending),
5857         DBSTATISTICS_FIELD(locks.num_failed),
5858         DBSTATISTICS_FIELD(db_ro_delegations),
5859 };
5860
5861 static void print_dbstatistics(const char *db_name,
5862                                struct ctdb_db_statistics *s)
5863 {
5864         int i;
5865         const char *prefix = NULL;
5866         int preflen = 0;
5867
5868         printf("DB Statistics %s\n", db_name);
5869
5870         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5871                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5872                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5873                         if (! prefix ||
5874                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5875                                 prefix = db_stats_fields[i].name;
5876                                 printf(" %*.*s\n", preflen-1, preflen-1,
5877                                        db_stats_fields[i].name);
5878                         }
5879                 } else {
5880                         preflen = 0;
5881                 }
5882                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5883                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5884                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5885         }
5886
5887         printf(" hop_count_buckets:");
5888         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5889                 printf(" %d", s->hop_count_bucket[i]);
5890         }
5891         printf("\n");
5892
5893         printf(" lock_buckets:");
5894         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5895                 printf(" %d", s->locks.buckets[i]);
5896         }
5897         printf("\n");
5898
5899         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5900                "locks_latency      MIN/AVG/MAX",
5901                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5902                s->locks.latency.max, s->locks.latency.num);
5903
5904         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5905                "vacuum_latency     MIN/AVG/MAX",
5906                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5907                s->vacuum.latency.max, s->vacuum.latency.num);
5908
5909         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5910         for (i=0; i<s->num_hot_keys; i++) {
5911                 int j;
5912                 printf("     Count:%d Key:", s->hot_keys[i].count);
5913                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5914                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5915                 }
5916                 printf("\n");
5917         }
5918 }
5919
5920 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5921                                 int argc, const char **argv)
5922 {
5923         uint32_t db_id;
5924         const char *db_name;
5925         struct ctdb_db_statistics *dbstats;
5926         int ret;
5927
5928         if (argc != 1) {
5929                 usage("dbstatistics");
5930         }
5931
5932         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5933                 return 1;
5934         }
5935
5936         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5937                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5938                                           &dbstats);
5939         if (ret != 0) {
5940                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5941                         db_name);
5942                 return ret;
5943         }
5944
5945         print_dbstatistics(db_name, dbstats);
5946         return 0;
5947 }
5948
5949 struct disable_takeover_runs_state {
5950         uint32_t *pnn_list;
5951         int node_count;
5952         bool *reply;
5953         int status;
5954         bool done;
5955 };
5956
5957 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5958                                          void *private_data)
5959 {
5960         struct disable_takeover_runs_state *state =
5961                 (struct disable_takeover_runs_state *)private_data;
5962         int ret, i;
5963
5964         if (data.dsize != sizeof(int)) {
5965                 /* Ignore packet */
5966                 return;
5967         }
5968
5969         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5970         ret = *(int *)data.dptr;
5971         if (ret < 0) {
5972                 state->status = ret;
5973                 state->done = true;
5974                 return;
5975         }
5976         for (i=0; i<state->node_count; i++) {
5977                 if (state->pnn_list[i] == ret) {
5978                         state->reply[i] = true;
5979                         break;
5980                 }
5981         }
5982
5983         state->done = true;
5984         for (i=0; i<state->node_count; i++) {
5985                 if (! state->reply[i]) {
5986                         state->done = false;
5987                         break;
5988                 }
5989         }
5990 }
5991
5992 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5993                                  struct ctdb_context *ctdb, uint32_t timeout,
5994                                  uint32_t *pnn_list, int count)
5995 {
5996         struct ctdb_disable_message disable = { 0 };
5997         struct disable_takeover_runs_state state;
5998         int ret, i;
5999
6000         disable.pnn = ctdb->pnn;
6001         disable.srvid = next_srvid(ctdb);
6002         disable.timeout = timeout;
6003
6004         state.pnn_list = pnn_list;
6005         state.node_count = count;
6006         state.done = false;
6007         state.status = 0;
6008         state.reply = talloc_zero_array(mem_ctx, bool, count);
6009         if (state.reply == NULL) {
6010                 return ENOMEM;
6011         }
6012
6013         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
6014                                               disable.srvid,
6015                                               disable_takeover_run_handler,
6016                                               &state);
6017         if (ret != 0) {
6018                 return ret;
6019         }
6020
6021         for (i=0; i<count; i++) {
6022                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
6023                                                          ctdb->client,
6024                                                          pnn_list[i],
6025                                                          &disable);
6026                 if (ret != 0) {
6027                         goto fail;
6028                 }
6029         }
6030
6031         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
6032         if (ret == ETIME) {
6033                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
6034         } else {
6035                 ret = (state.status >= 0 ? 0 : 1);
6036         }
6037
6038 fail:
6039         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
6040                                            disable.srvid, &state);
6041         return ret;
6042 }
6043
6044 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6045                              int argc, const char **argv)
6046 {
6047         const char *nodestring = NULL;
6048         struct ctdb_node_map *nodemap, *nodemap2;
6049         struct ctdb_req_control request;
6050         uint32_t *pnn_list, *pnn_list2;
6051         int ret, count, count2;
6052
6053         if (argc > 1) {
6054                 usage("reloadips");
6055         }
6056
6057         if (argc == 1) {
6058                 nodestring = argv[0];
6059         }
6060
6061         nodemap = get_nodemap(ctdb, false);
6062         if (nodemap == NULL) {
6063                 return 1;
6064         }
6065
6066         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6067                 return 1;
6068         }
6069
6070         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6071                                         mem_ctx, &pnn_list);
6072         if (count <= 0) {
6073                 fprintf(stderr, "Memory allocation error\n");
6074                 return 1;
6075         }
6076
6077         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6078                                       mem_ctx, &pnn_list2);
6079         if (count2 <= 0) {
6080                 fprintf(stderr, "Memory allocation error\n");
6081                 return 1;
6082         }
6083
6084         /* Disable takeover runs on all connected nodes.  A reply
6085          * indicating success is needed from each node so all nodes
6086          * will need to be active.
6087          *
6088          * A check could be added to not allow reloading of IPs when
6089          * there are disconnected nodes.  However, this should
6090          * probably be left up to the administrator.
6091          */
6092         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6093                                     pnn_list, count);
6094         if (ret != 0) {
6095                 fprintf(stderr, "Failed to disable takeover runs\n");
6096                 return ret;
6097         }
6098
6099         /* Now tell all the desired nodes to reload their public IPs.
6100          * Keep trying this until it succeeds.  This assumes all
6101          * failures are transient, which might not be true...
6102          */
6103         ctdb_req_control_reload_public_ips(&request);
6104         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6105                                         pnn_list2, count2, TIMEOUT(),
6106                                         &request, NULL, NULL);
6107         if (ret != 0) {
6108                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6109         }
6110
6111         /* It isn't strictly necessary to wait until takeover runs are
6112          * re-enabled but doing so can't hurt.
6113          */
6114         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6115         if (ret != 0) {
6116                 fprintf(stderr, "Failed to enable takeover runs\n");
6117                 return ret;
6118         }
6119
6120         return ipreallocate(mem_ctx, ctdb);
6121 }
6122
6123 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6124                            int argc, const char **argv)
6125 {
6126         ctdb_sock_addr addr;
6127         char *iface;
6128
6129         if (argc != 1) {
6130                 usage("ipiface");
6131         }
6132
6133         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6134                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6135                 return 1;
6136         }
6137
6138         iface = ctdb_sys_find_ifname(&addr);
6139         if (iface == NULL) {
6140                 fprintf(stderr, "Failed to find interface for IP %s\n",
6141                         argv[0]);
6142                 return 1;
6143         }
6144         free(iface);
6145
6146         return 0;
6147 }
6148
6149
6150 static const struct ctdb_cmd {
6151         const char *name;
6152         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6153         bool without_daemon; /* can be run without daemon running ? */
6154         bool remote; /* can be run on remote nodes */
6155         const char *msg;
6156         const char *args;
6157 } ctdb_commands[] = {
6158         { "version", control_version, true, false,
6159                 "show version of ctdb", NULL },
6160         { "status", control_status, false, true,
6161                 "show node status", NULL },
6162         { "uptime", control_uptime, false, true,
6163                 "show node uptime", NULL },
6164         { "ping", control_ping, false, true,
6165                 "ping all nodes", NULL },
6166         { "runstate", control_runstate, false, true,
6167                 "get/check runstate of a node",
6168                 "[setup|first_recovery|startup|running]" },
6169         { "getvar", control_getvar, false, true,
6170                 "get a tunable variable", "<name>" },
6171         { "setvar", control_setvar, false, true,
6172                 "set a tunable variable", "<name> <value>" },
6173         { "listvars", control_listvars, false, true,
6174                 "list tunable variables", NULL },
6175         { "statistics", control_statistics, false, true,
6176                 "show ctdb statistics", NULL },
6177         { "statisticsreset", control_statistics_reset, false, true,
6178                 "reset ctdb statistics", NULL },
6179         { "stats", control_stats, false, true,
6180                 "show rolling statistics", "[count]" },
6181         { "ip", control_ip, false, true,
6182                 "show public ips", "[all]" },
6183         { "ipinfo", control_ipinfo, false, true,
6184                 "show public ip details", "<ip>" },
6185         { "ifaces", control_ifaces, false, true,
6186                 "show interfaces", NULL },
6187         { "setifacelink", control_setifacelink, false, true,
6188                 "set interface link status", "<iface> up|down" },
6189         { "process-exists", control_process_exists, false, true,
6190                 "check if a process exists on a node",  "<pid>" },
6191         { "getdbmap", control_getdbmap, false, true,
6192                 "show attached databases", NULL },
6193         { "getdbstatus", control_getdbstatus, false, true,
6194                 "show database status", "<dbname|dbid>" },
6195         { "catdb", control_catdb, false, false,
6196                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6197         { "cattdb", control_cattdb, false, false,
6198                 "dump local ctdb database", "<dbname|dbid>" },
6199         { "getmonmode", control_getmonmode, false, true,
6200                 "show monitoring mode", NULL },
6201         { "getcapabilities", control_getcapabilities, false, true,
6202                 "show node capabilities", NULL },
6203         { "pnn", control_pnn, false, false,
6204                 "show the pnn of the currnet node", NULL },
6205         { "lvs", control_lvs, false, false,
6206                 "show lvs configuration", "master|list|status" },
6207         { "disablemonitor", control_disable_monitor, false, true,
6208                 "disable monitoring", NULL },
6209         { "enablemonitor", control_enable_monitor, false, true,
6210                 "enable monitoring", NULL },
6211         { "setdebug", control_setdebug, false, true,
6212                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6213         { "getdebug", control_getdebug, false, true,
6214                 "get debug level", NULL },
6215         { "attach", control_attach, false, false,
6216                 "attach a database", "<dbname> [persistent]" },
6217         { "detach", control_detach, false, false,
6218                 "detach database(s)", "<dbname|dbid> ..." },
6219         { "dumpmemory", control_dumpmemory, false, true,
6220                 "dump ctdbd memory map", NULL },
6221         { "rddumpmemory", control_rddumpmemory, false, true,
6222                 "dump recoverd memory map", NULL },
6223         { "getpid", control_getpid, false, true,
6224                 "get ctdbd process ID", NULL },
6225         { "disable", control_disable, false, true,
6226                 "disable a node", NULL },
6227         { "enable", control_enable, false, true,
6228                 "enable a node", NULL },
6229         { "stop", control_stop, false, true,
6230                 "stop a node", NULL },
6231         { "continue", control_continue, false, true,
6232                 "continue a stopped node", NULL },
6233         { "ban", control_ban, false, true,
6234                 "ban a node", "<bantime>"},
6235         { "unban", control_unban, false, true,
6236                 "unban a node", NULL },
6237         { "shutdown", control_shutdown, false, true,
6238                 "shutdown ctdb daemon", NULL },
6239         { "recover", control_recover, false, true,
6240                 "force recovery", NULL },
6241         { "sync", control_ipreallocate, false, true,
6242                 "run ip reallocation (deprecated)", NULL },
6243         { "ipreallocate", control_ipreallocate, false, true,
6244                 "run ip reallocation", NULL },
6245         { "isnotrecmaster", control_isnotrecmaster, false, false,
6246                 "check if local node is the recmaster", NULL },
6247         { "gratarp", control_gratarp, false, true,
6248                 "send a gratuitous arp", "<ip> <interface>" },
6249         { "tickle", control_tickle, false, false,
6250                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6251         { "gettickles", control_gettickles, false, true,
6252                 "get the list of tickles", "<ip> [<port>]" },
6253         { "addtickle", control_addtickle, false, true,
6254                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6255         { "deltickle", control_deltickle, false, true,
6256                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6257         { "check_srvids", control_check_srvids, false, true,
6258                 "check if srvid is registered", "<id> [<id> ...]" },
6259         { "listnodes", control_listnodes, true, true,
6260                 "list nodes in the cluster", NULL },
6261         { "reloadnodes", control_reloadnodes, false, false,
6262                 "reload the nodes file all nodes", NULL },
6263         { "moveip", control_moveip, false, false,
6264                 "move an ip address to another node", "<ip> <node>" },
6265         { "rebalanceip", control_rebalanceip, false, false,
6266                 "move an ip address optimally to another node", "<ip>" },
6267         { "addip", control_addip, false, true,
6268                 "add an ip address to a node", "<ip/mask> <iface>" },
6269         { "delip", control_delip, false, true,
6270                 "delete an ip address from a node", "<ip>" },
6271         { "eventscript", control_eventscript, false, true,
6272                 "run an event", "monitor" },
6273         { "backupdb", control_backupdb, false, false,
6274                 "backup a database into a file", "<dbname|dbid> <file>" },
6275         { "restoredb", control_restoredb, false, false,
6276                 "restore a database from a file", "<file> [dbname]" },
6277         { "dumpdbbackup", control_dumpdbbackup, true, false,
6278                 "dump database from a backup file", "<file>" },
6279         { "wipedb", control_wipedb, false, false,
6280                 "wipe the contents of a database.", "<dbname|dbid>"},
6281         { "recmaster", control_recmaster, false, true,
6282                 "show the pnn for the recovery master", NULL },
6283         { "scriptstatus", control_scriptstatus, false, true,
6284                 "show event script status",
6285                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6286         { "enablescript", control_enablescript, false, true,
6287                 "enable an eventscript", "<script>"},
6288         { "disablescript", control_disablescript, false, true,
6289                 "disable an eventscript", "<script>"},
6290         { "natgw", control_natgw, false, false,
6291                 "show natgw configuration", "master|list|status" },
6292         { "natgwlist", control_natgwlist, false, false,
6293                 "show the nodes belonging to this natgw configuration", NULL },
6294         { "getreclock", control_getreclock, false, true,
6295                 "get recovery lock file", NULL },
6296         { "setlmasterrole", control_setlmasterrole, false, true,
6297                 "set LMASTER role", "on|off" },
6298         { "setrecmasterrole", control_setrecmasterrole, false, true,
6299                 "set RECMASTER role", "on|off"},
6300         { "setdbreadonly", control_setdbreadonly, false, true,
6301                 "enable readonly records", "<dbname|dbid>" },
6302         { "setdbsticky", control_setdbsticky, false, true,
6303                 "enable sticky records", "<dbname|dbid>"},
6304         { "pfetch", control_pfetch, false, false,
6305                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6306         { "pstore", control_pstore, false, false,
6307                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6308         { "pdelete", control_pdelete, false, false,
6309                 "delete record from persistent database", "<dbname|dbid> <key>" },
6310         { "ptrans", control_ptrans, false, false,
6311                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6312         { "tfetch", control_tfetch, false, true,
6313                 "fetch a record", "<tdb-file> <key> [<file>]" },
6314         { "tstore", control_tstore, false, true,
6315                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6316         { "readkey", control_readkey, false, false,
6317                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6318         { "writekey", control_writekey, false, false,
6319                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6320         { "deletekey", control_deletekey, false, false,
6321                 "delete a database key", "<dbname|dbid> <key>" },
6322         { "checktcpport", control_checktcpport, true, false,
6323                 "check if a service is bound to a specific tcp port or not", "<port>" },
6324         { "rebalancenode", control_rebalancenode, false, true,
6325                 "mark nodes as forced IP rebalancing targets", NULL },
6326         { "getdbseqnum", control_getdbseqnum, false, false,
6327                 "get database sequence number", "<dbname|dbid>" },
6328         { "nodestatus", control_nodestatus, false, true,
6329                 "show and return node status", "[all|<pnn-list>]" },
6330         { "dbstatistics", control_dbstatistics, false, true,
6331                 "show database statistics", "<dbname|dbid>" },
6332         { "reloadips", control_reloadips, false, false,
6333                 "reload the public addresses file", "[all|<pnn-list>]" },
6334         { "ipiface", control_ipiface, true, false,
6335                 "Find the interface an ip address is hosted on", "<ip>" },
6336 };
6337
6338 static const struct ctdb_cmd *match_command(const char *command)
6339 {
6340         const struct ctdb_cmd *cmd;
6341         int i;
6342
6343         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6344                 cmd = &ctdb_commands[i];
6345                 if (strlen(command) == strlen(cmd->name) &&
6346                     strncmp(command, cmd->name, strlen(command)) == 0) {
6347                         return cmd;
6348                 }
6349         }
6350
6351         return NULL;
6352 }
6353
6354
6355 /**
6356  * Show usage message
6357  */
6358 static void usage_full(void)
6359 {
6360         int i;
6361
6362         poptPrintHelp(pc, stdout, 0);
6363         printf("\nCommands:\n");
6364         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6365                 printf("  %-15s %-27s  %s\n",
6366                        ctdb_commands[i].name,
6367                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6368                        ctdb_commands[i].msg);
6369         }
6370 }
6371
6372 static void usage(const char *command)
6373 {
6374         const struct ctdb_cmd *cmd;
6375
6376         if (command == NULL) {
6377                 usage_full();
6378                 exit(1);
6379         }
6380
6381         cmd = match_command(command);
6382         if (cmd == NULL) {
6383                 usage_full();
6384         } else {
6385                 poptPrintUsage(pc, stdout, 0);
6386                 printf("\nCommands:\n");
6387                 printf("  %-15s %-27s  %s\n",
6388                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6389         }
6390
6391         exit(1);
6392 }
6393
6394 struct poptOption cmdline_options[] = {
6395         POPT_AUTOHELP
6396         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6397                 "CTDB socket path", "filename" },
6398         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6399                 "debug level"},
6400         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6401                 "timelimit (in seconds)" },
6402         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6403                 "node specification - integer" },
6404         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6405                 "enable machine readable output", NULL },
6406         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6407                 "specify separator for machine readable output", "CHAR" },
6408         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6409                 "enable machine parsable output with separator |", NULL },
6410         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6411                 "enable verbose output", NULL },
6412         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6413                 "die if runtime exceeds this limit (in seconds)" },
6414         POPT_TABLEEND
6415 };
6416
6417 static int process_command(const struct ctdb_cmd *cmd, int argc,
6418                            const char **argv)
6419 {
6420         TALLOC_CTX *tmp_ctx;
6421         struct ctdb_context *ctdb;
6422         int ret;
6423         bool status;
6424         uint64_t srvid_offset;
6425
6426         tmp_ctx = talloc_new(NULL);
6427         if (tmp_ctx == NULL) {
6428                 fprintf(stderr, "Memory allocation error\n");
6429                 goto fail;
6430         }
6431
6432         if (cmd->without_daemon) {
6433                 if (options.pnn != -1) {
6434                         fprintf(stderr,
6435                                 "Cannot specify node for command %s\n",
6436                                 cmd->name);
6437                         goto fail;
6438                 }
6439
6440                 return cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6441         }
6442
6443         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6444         if (ctdb == NULL) {
6445                 fprintf(stderr, "Memory allocation error\n");
6446                 goto fail;
6447         }
6448
6449         ctdb->ev = tevent_context_init(ctdb);
6450         if (ctdb->ev == NULL) {
6451                 fprintf(stderr, "Failed to initialize tevent\n");
6452                 goto fail;
6453         }
6454
6455         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6456         if (ret != 0) {
6457                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6458                         options.socket);
6459
6460                 if (!find_node_xpnn(ctdb, NULL)) {
6461                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6462                 }
6463                 goto fail;
6464         }
6465
6466         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6467         srvid_offset = getpid() & 0xFFFF;
6468         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6469
6470         if (options.pnn != -1) {
6471                 status = verify_pnn(ctdb, options.pnn);
6472                 if (! status) {
6473                         goto fail;
6474                 }
6475
6476                 ctdb->cmd_pnn = options.pnn;
6477         } else {
6478                 ctdb->cmd_pnn = ctdb->pnn;
6479         }
6480
6481         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6482                 fprintf(stderr, "Node cannot be specified for command %s\n",
6483                         cmd->name);
6484                 goto fail;
6485         }
6486
6487         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6488         talloc_free(tmp_ctx);
6489         return ret;
6490
6491 fail:
6492         talloc_free(tmp_ctx);
6493         return 1;
6494 }
6495
6496 static void signal_handler(int sig)
6497 {
6498         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6499 }
6500
6501 static void alarm_handler(int sig)
6502 {
6503         /* Kill any child processes */
6504         signal(SIGTERM, signal_handler);
6505         kill(0, SIGTERM);
6506
6507         _exit(1);
6508 }
6509
6510 int main(int argc, const char *argv[])
6511 {
6512         int opt;
6513         const char **extra_argv;
6514         int extra_argc;
6515         const struct ctdb_cmd *cmd;
6516         const char *ctdb_socket;
6517         enum debug_level loglevel;
6518         int ret;
6519
6520         setlinebuf(stdout);
6521
6522         /* Set default options */
6523         options.socket = CTDB_SOCKET;
6524         options.debuglevelstr = NULL;
6525         options.timelimit = 10;
6526         options.sep = "|";
6527         options.maxruntime = 0;
6528         options.pnn = -1;
6529
6530         ctdb_socket = getenv("CTDB_SOCKET");
6531         if (ctdb_socket != NULL) {
6532                 options.socket = ctdb_socket;
6533         }
6534
6535         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6536                             POPT_CONTEXT_KEEP_FIRST);
6537         while ((opt = poptGetNextOpt(pc)) != -1) {
6538                 fprintf(stderr, "Invalid option %s: %s\n",
6539                         poptBadOption(pc, 0), poptStrerror(opt));
6540                 exit(1);
6541         }
6542
6543         if (options.maxruntime == 0) {
6544                 const char *ctdb_timeout;
6545
6546                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6547                 if (ctdb_timeout != NULL) {
6548                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6549                 } else {
6550                         options.maxruntime = 120;
6551                 }
6552         }
6553         if (options.maxruntime <= 120) {
6554                 /* default timeout is 120 seconds */
6555                 options.maxruntime = 120;
6556         }
6557
6558         if (options.machineparsable) {
6559                 options.machinereadable = 1;
6560         }
6561
6562         /* setup the remaining options for the commands */
6563         extra_argc = 0;
6564         extra_argv = poptGetArgs(pc);
6565         if (extra_argv) {
6566                 extra_argv++;
6567                 while (extra_argv[extra_argc]) extra_argc++;
6568         }
6569
6570         if (extra_argc < 1) {
6571                 usage(NULL);
6572         }
6573
6574         cmd = match_command(extra_argv[0]);
6575         if (cmd == NULL) {
6576                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6577                 exit(1);
6578         }
6579
6580         /* Enable logging */
6581         setup_logging("ctdb", DEBUG_STDERR);
6582         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6583                 DEBUGLEVEL = loglevel;
6584         } else {
6585                 DEBUGLEVEL = DEBUG_ERR;
6586         }
6587
6588         signal(SIGALRM, alarm_handler);
6589         alarm(options.maxruntime);
6590
6591         ret = process_command(cmd, extra_argc, extra_argv);
6592         if (ret == -1) {
6593                 ret = 1;
6594         }
6595
6596         (void)poptFreeContext(pc);
6597
6598         return ret;
6599 }