ctdb-tools: Fix CID 1364704 - resource leak
[samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "common/db_hash.h"
37 #include "common/logging.h"
38 #include "protocol/protocol.h"
39 #include "protocol/protocol_api.h"
40 #include "common/system.h"
41 #include "client/client.h"
42
43 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
44
45 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
46 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
47
48 static struct {
49         const char *socket;
50         const char *debuglevelstr;
51         int timelimit;
52         int pnn;
53         int machinereadable;
54         const char *sep;
55         int machineparsable;
56         int verbose;
57         int maxruntime;
58         int printemptyrecords;
59         int printdatasize;
60         int printlmaster;
61         int printhash;
62         int printrecordflags;
63 } options;
64
65 static poptContext pc;
66
67 struct ctdb_context {
68         struct tevent_context *ev;
69         struct ctdb_client_context *client;
70         struct ctdb_node_map *nodemap;
71         uint32_t pnn, cmd_pnn;
72         uint64_t srvid;
73 };
74
75 static void usage(const char *command);
76
77 /*
78  * Utility Functions
79  */
80
81 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
82 {
83         return (tv2->tv_sec - tv->tv_sec) +
84                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
85 }
86
87 static struct ctdb_node_and_flags *get_node_by_pnn(
88                                         struct ctdb_node_map *nodemap,
89                                         uint32_t pnn)
90 {
91         int i;
92
93         for (i=0; i<nodemap->num; i++) {
94                 if (nodemap->node[i].pnn == pnn) {
95                         return &nodemap->node[i];
96                 }
97         }
98         return NULL;
99 }
100
101 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
102 {
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         char *flags_str = NULL;
116         int i;
117
118         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
119                 if (flags & flag_names[i].flag) {
120                         if (flags_str == NULL) {
121                                 flags_str = talloc_asprintf(mem_ctx,
122                                                 "%s", flag_names[i].name);
123                         } else {
124                                 flags_str = talloc_asprintf_append(flags_str,
125                                                 "|%s", flag_names[i].name);
126                         }
127                         if (flags_str == NULL) {
128                                 return "OUT-OF-MEMORY";
129                         }
130                 }
131         }
132         if (flags_str == NULL) {
133                 return "OK";
134         }
135
136         return flags_str;
137 }
138
139 static uint64_t next_srvid(struct ctdb_context *ctdb)
140 {
141         ctdb->srvid += 1;
142         return ctdb->srvid;
143 }
144
145 /*
146  * Get consistent nodemap information.
147  *
148  * If nodemap is already cached, use that. If not get it.
149  * If the current node is BANNED, then get nodemap from "better" node.
150  */
151 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
152 {
153         TALLOC_CTX *tmp_ctx;
154         struct ctdb_node_map *nodemap;
155         struct ctdb_node_and_flags *node;
156         uint32_t current_node;
157         int ret;
158
159         if (force) {
160                 TALLOC_FREE(ctdb->nodemap);
161         }
162
163         if (ctdb->nodemap != NULL) {
164                 return ctdb->nodemap;
165         }
166
167         tmp_ctx = talloc_new(ctdb);
168         if (tmp_ctx == NULL) {
169                 return false;
170         }
171
172         current_node = ctdb->pnn;
173 again:
174         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
175                                     current_node, TIMEOUT(), &nodemap);
176         if (ret != 0) {
177                 fprintf(stderr, "Failed to get nodemap from node %u\n",
178                         current_node);
179                 goto failed;
180         }
181
182         node = get_node_by_pnn(nodemap, current_node);
183         if (node->flags & NODE_FLAGS_BANNED) {
184                 /* Pick next node */
185                 do {
186                         current_node = (current_node + 1) % nodemap->num;
187                         node = get_node_by_pnn(nodemap, current_node);
188                         if (! (node->flags &
189                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
190                                 break;
191                         }
192                 } while (current_node != ctdb->pnn);
193
194                 if (current_node == ctdb->pnn) {
195                         /* Tried all nodes in the cluster */
196                         fprintf(stderr, "Warning: All nodes are banned.\n");
197                         goto failed;
198                 }
199
200                 goto again;
201         }
202
203         ctdb->nodemap = talloc_steal(ctdb, nodemap);
204         return nodemap;
205
206 failed:
207         talloc_free(tmp_ctx);
208         return NULL;
209 }
210
211 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
212 {
213         struct ctdb_node_map *nodemap;
214         bool found;
215         int i;
216
217         if (pnn == -1) {
218                 return false;
219         }
220
221         nodemap = get_nodemap(ctdb, false);
222         if (nodemap == NULL) {
223                 return false;
224         }
225
226         found = false;
227         for (i=0; i<nodemap->num; i++) {
228                 if (nodemap->node[i].pnn == pnn) {
229                         found = true;
230                         break;
231                 }
232         }
233         if (! found) {
234                 fprintf(stderr, "Node %u does not exist\n", pnn);
235                 return false;
236         }
237
238         if (nodemap->node[i].flags &
239             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
240                 fprintf(stderr, "Node %u has status %s\n", pnn,
241                         pretty_print_flags(ctdb, nodemap->node[i].flags));
242                 return false;
243         }
244
245         return true;
246 }
247
248 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
249                                             struct ctdb_node_map *nodemap)
250 {
251         struct ctdb_node_map *nodemap2;
252
253         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
254         if (nodemap2 == NULL) {
255                 return NULL;
256         }
257
258         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
259                                       nodemap->num);
260         if (nodemap2->node == NULL) {
261                 talloc_free(nodemap2);
262                 return NULL;
263         }
264
265         return nodemap2;
266 }
267
268 /*
269  * Get the number and the list of matching nodes
270  *
271  *   nodestring :=  NULL | all | pnn,[pnn,...]
272  *
273  * If nodestring is NULL, use the current node.
274  */
275 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
276                              const char *nodestring,
277                              struct ctdb_node_map **out)
278 {
279         struct ctdb_node_map *nodemap, *nodemap2;
280         struct ctdb_node_and_flags *node;
281         int i;
282
283         nodemap = get_nodemap(ctdb, false);
284         if (nodemap == NULL) {
285                 return false;
286         }
287
288         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
289         if (nodemap2 == NULL) {
290                 return false;
291         }
292
293         if (nodestring == NULL) {
294                 for (i=0; i<nodemap->num; i++) {
295                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
296                                 nodemap2->node[0] = nodemap->node[i];
297                                 break;
298                         }
299                 }
300                 nodemap2->num = 1;
301
302                 goto done;
303         }
304
305         if (strcmp(nodestring, "all") == 0) {
306                 for (i=0; i<nodemap->num; i++) {
307                         nodemap2->node[i] = nodemap->node[i];
308                 }
309                 nodemap2->num = nodemap->num;
310
311                 goto done;
312         } else {
313                 char *ns, *tok;
314
315                 ns = talloc_strdup(mem_ctx, nodestring);
316                 if (ns == NULL) {
317                         return false;
318                 }
319
320                 tok = strtok(ns, ",");
321                 while (tok != NULL) {
322                         uint32_t pnn;
323                         char *endptr;
324
325                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
326                         if (pnn == 0 && tok == endptr) {
327                                 fprintf(stderr, "Invalid node %s\n", tok);
328                                         return false;
329                         }
330
331                         node = get_node_by_pnn(nodemap, pnn);
332                         if (node == NULL) {
333                                 fprintf(stderr, "Node %u does not exist\n",
334                                         pnn);
335                                 return false;
336                         }
337
338                         nodemap2->node[nodemap2->num] = *node;
339                         nodemap2->num += 1;
340
341                         tok = strtok(NULL, ",");
342                 }
343         }
344
345 done:
346         *out = nodemap2;
347         return true;
348 }
349
350 /* Compare IP address */
351 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
352 {
353         bool ret = false;
354
355         if (ip1->sa.sa_family != ip2->sa.sa_family) {
356                 return false;
357         }
358
359         switch (ip1->sa.sa_family) {
360         case AF_INET:
361                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
362                               sizeof(struct in_addr)) == 0);
363                 break;
364
365         case AF_INET6:
366                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
367                               sizeof(struct in6_addr)) == 0);
368                 break;
369         }
370
371         return ret;
372 }
373
374 /* Append a node to a node map with given address and flags */
375 static bool node_map_add(struct ctdb_node_map *nodemap,
376                          const char *nstr, uint32_t flags)
377 {
378         ctdb_sock_addr addr;
379         uint32_t num;
380         struct ctdb_node_and_flags *n;
381
382         if (! parse_ip(nstr, NULL, 0, &addr)) {
383                 fprintf(stderr, "Invalid IP address %s\n", nstr);
384                 return false;
385         }
386
387         num = nodemap->num;
388         nodemap->node = talloc_realloc(nodemap, nodemap->node,
389                                        struct ctdb_node_and_flags, num+1);
390         if (nodemap->node == NULL) {
391                 return false;
392         }
393
394         n = &nodemap->node[num];
395         n->addr = addr;
396         n->pnn = num;
397         n->flags = flags;
398
399         nodemap->num = num+1;
400         return true;
401 }
402
403 /* Read a nodes file into a node map */
404 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
405                                                   const char *nlist)
406 {
407         char **lines;
408         int nlines;
409         int i;
410         struct ctdb_node_map *nodemap;
411
412         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
413         if (nodemap == NULL) {
414                 return NULL;
415         }
416
417         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
418         if (lines == NULL) {
419                 return NULL;
420         }
421
422         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
423                 nlines--;
424         }
425
426         for (i=0; i<nlines; i++) {
427                 char *node;
428                 uint32_t flags;
429                 size_t len;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436
437                 len = strlen(node);
438
439                 /* strip trailing spaces */
440                 while ((len > 1) &&
441                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
442                 {
443                         node[len-1] = '\0';
444                         len--;
445                 }
446
447                 if (len == 0) {
448                         continue;
449                 }
450                 if (*node == '#') {
451                         /* A "deleted" node is a node that is
452                            commented out in the nodes file.  This is
453                            used instead of removing a line, which
454                            would cause subsequent nodes to change
455                            their PNN. */
456                         flags = NODE_FLAGS_DELETED;
457                         node = discard_const("0.0.0.0");
458                 } else {
459                         flags = 0;
460                 }
461                 if (! node_map_add(nodemap, node, flags)) {
462                         talloc_free(lines);
463                         TALLOC_FREE(nodemap);
464                         return NULL;
465                 }
466         }
467
468         talloc_free(lines);
469         return nodemap;
470 }
471
472 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
473 {
474         struct ctdb_node_map *nodemap;
475         char *nodepath;
476         const char *nodes_list = NULL;
477
478         if (pnn != CTDB_UNKNOWN_PNN) {
479                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
480                 if (nodepath != NULL) {
481                         nodes_list = getenv(nodepath);
482                 }
483         }
484         if (nodes_list == NULL) {
485                 nodes_list = getenv("CTDB_NODES");
486         }
487         if (nodes_list == NULL) {
488                 const char *basedir = getenv("CTDB_BASE");
489                 if (basedir == NULL) {
490                         basedir = CTDB_ETCDIR;
491                 }
492                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
493                 if (nodes_list == NULL) {
494                         fprintf(stderr, "Memory allocation error\n");
495                         return NULL;
496                 }
497         }
498
499         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
500         if (nodemap == NULL) {
501                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
502                         nodes_list);
503                 return NULL;
504         }
505
506         return nodemap;
507 }
508
509 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
510                                  struct ctdb_context *ctdb,
511                                  struct ctdb_dbid_map *dbmap,
512                                  const char *db_name)
513 {
514         struct ctdb_dbid *db = NULL;
515         const char *name;
516         int ret, i;
517
518         for (i=0; i<dbmap->num; i++) {
519                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
520                                            ctdb->pnn, TIMEOUT(),
521                                            dbmap->dbs[i].db_id, &name);
522                 if (ret != 0) {
523                         return false;
524                 }
525
526                 if (strcmp(db_name, name) == 0) {
527                         talloc_free(discard_const(name));
528                         db = &dbmap->dbs[i];
529                         break;
530                 }
531         }
532
533         return db;
534 }
535
536 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
537                       const char *db_arg, uint32_t *db_id,
538                       const char **db_name, uint8_t *db_flags)
539 {
540         struct ctdb_dbid_map *dbmap;
541         struct ctdb_dbid *db = NULL;
542         uint32_t id = 0;
543         const char *name = NULL;
544         int ret, i;
545
546         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
547                                   ctdb->pnn, TIMEOUT(), &dbmap);
548         if (ret != 0) {
549                 return false;
550         }
551
552         if (strncmp(db_arg, "0x", 2) == 0) {
553                 id = strtoul(db_arg, NULL, 0);
554                 for (i=0; i<dbmap->num; i++) {
555                         if (id == dbmap->dbs[i].db_id) {
556                                 db = &dbmap->dbs[i];
557                                 break;
558                         }
559                 }
560         } else {
561                 name = db_arg;
562                 db = db_find(mem_ctx, ctdb, dbmap, name);
563         }
564
565         if (db == NULL) {
566                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
567                 return false;
568         }
569
570         if (name == NULL) {
571                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
572                                            ctdb->pnn, TIMEOUT(), id, &name);
573                 if (ret != 0) {
574                         return false;
575                 }
576         }
577
578         if (db_id != NULL) {
579                 *db_id = db->db_id;
580         }
581         if (db_name != NULL) {
582                 *db_name = talloc_strdup(mem_ctx, name);
583         }
584         if (db_flags != NULL) {
585                 *db_flags = db->flags;
586         }
587         return true;
588 }
589
590 static int h2i(char h)
591 {
592         if (h >= 'a' && h <= 'f') {
593                 return h - 'a' + 10;
594         }
595         if (h >= 'A' && h <= 'F') {
596                 return h - 'f' + 10;
597         }
598         return h - '0';
599 }
600
601 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
602                        TDB_DATA *out)
603 {
604         int i;
605         TDB_DATA data;
606
607         if (len & 0x01) {
608                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
609                         str);
610                 return EINVAL;
611         }
612
613         data.dsize = len / 2;
614         data.dptr = talloc_size(mem_ctx, data.dsize);
615         if (data.dptr == NULL) {
616                 return ENOMEM;
617         }
618
619         for (i=0; i<data.dsize; i++) {
620                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
621         }
622
623         *out = data;
624         return 0;
625 }
626
627 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
628                        TDB_DATA *out)
629 {
630         TDB_DATA data;
631         int ret = 0;
632
633         if (strncmp(str, "0x", 2) == 0) {
634                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
635         } else {
636                 data.dptr = talloc_memdup(mem_ctx, str, len);
637                 if (data.dptr == NULL) {
638                         return ENOMEM;
639                 }
640                 data.dsize = len;
641         }
642
643         *out = data;
644         return ret;
645 }
646
647 static int run_helper(const char *command, const char *path, const char *arg1)
648 {
649         pid_t pid;
650         int save_errno, status, ret;
651
652         pid = fork();
653         if (pid < 0) {
654                 save_errno = errno;
655                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
656                         command, path, strerror(save_errno));
657                 return save_errno;
658         }
659
660         if (pid == 0) {
661                 ret = execl(path, path, arg1, NULL);
662                 if (ret == -1) {
663                         _exit(errno);
664                 }
665                 /* Should not happen */
666                 _exit(ENOEXEC);
667         }
668
669         ret = waitpid(pid, &status, 0);
670         if (ret == -1) {
671                 save_errno = errno;
672                 fprintf(stderr, "waitpid() failed for %s - %s\n",
673                         command, strerror(save_errno));
674                 return save_errno;
675         }
676
677         if (WIFEXITED(status)) {
678                 ret = WEXITSTATUS(status);
679                 return ret;
680         }
681
682         if (WIFSIGNALED(status)) {
683                 fprintf(stderr, "%s terminated with signal %d\n",
684                         command, WTERMSIG(status));
685                 return EINTR;
686         }
687
688         return 0;
689 }
690
691 /*
692  * Command Functions
693  */
694
695 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
696                            int argc, const char **argv)
697 {
698         printf("%s\n", CTDB_VERSION_STRING);
699         return 0;
700 }
701
702 static bool partially_online(TALLOC_CTX *mem_ctx,
703                              struct ctdb_context *ctdb,
704                              struct ctdb_node_and_flags *node)
705 {
706         struct ctdb_iface_list *iface_list;
707         int ret, i;
708         bool status = false;
709
710         if (node->flags != 0) {
711                 return false;
712         }
713
714         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
715                                    node->pnn, TIMEOUT(), &iface_list);
716         if (ret != 0) {
717                 return false;
718         }
719
720         status = false;
721         for (i=0; i < iface_list->num; i++) {
722                 if (iface_list->iface[i].link_state == 0) {
723                         status = true;
724                         break;
725                 }
726         }
727
728         return status;
729 }
730
731 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
732                                   struct ctdb_context *ctdb,
733                                   struct ctdb_node_map *nodemap,
734                                   uint32_t mypnn)
735 {
736         struct ctdb_node_and_flags *node;
737         int i;
738
739         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
740                options.sep,
741                "Node", options.sep,
742                "IP", options.sep,
743                "Disconnected", options.sep,
744                "Banned", options.sep,
745                "Disabled", options.sep,
746                "Unhealthy", options.sep,
747                "Stopped", options.sep,
748                "Inactive", options.sep,
749                "PartiallyOnline", options.sep,
750                "ThisNode", options.sep);
751
752         for (i=0; i<nodemap->num; i++) {
753                 node = &nodemap->node[i];
754                 if (node->flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757
758                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
759                        options.sep,
760                        node->pnn, options.sep,
761                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
762                        options.sep,
763                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
764                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
765                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
766                        options.sep,
767                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
768                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
769                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
770                        partially_online(mem_ctx, ctdb, node), options.sep,
771                        (node->pnn == mypnn)?'Y':'N', options.sep);
772         }
773
774 }
775
776 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
777                           struct ctdb_node_map *nodemap, uint32_t mypnn)
778 {
779         struct ctdb_node_and_flags *node;
780         int num_deleted_nodes = 0;
781         int i;
782
783         for (i=0; i<nodemap->num; i++) {
784                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
785                         num_deleted_nodes++;
786                 }
787         }
788
789         if (num_deleted_nodes == 0) {
790                 printf("Number of nodes:%d\n", nodemap->num);
791         } else {
792                 printf("Number of nodes:%d (including %d deleted nodes)\n",
793                        nodemap->num, num_deleted_nodes);
794         }
795
796         for (i=0; i<nodemap->num; i++) {
797                 node = &nodemap->node[i];
798                 if (node->flags & NODE_FLAGS_DELETED) {
799                         continue;
800                 }
801
802                 printf("pnn:%u %-16s %s%s\n",
803                        node->pnn,
804                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
805                        partially_online(mem_ctx, ctdb, node) ?
806                                 "PARTIALLYONLINE" :
807                                 pretty_print_flags(mem_ctx, node->flags),
808                        node->pnn == mypnn ? " (THIS NODE)" : "");
809         }
810 }
811
812 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
813                          struct ctdb_node_map *nodemap, uint32_t mypnn,
814                          struct ctdb_vnn_map *vnnmap, int recmode,
815                          uint32_t recmaster)
816 {
817         int i;
818
819         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
820
821         if (vnnmap->generation == INVALID_GENERATION) {
822                 printf("Generation:INVALID\n");
823         } else {
824                 printf("Generation:%u\n", vnnmap->generation);
825         }
826         printf("Size:%d\n", vnnmap->size);
827         for (i=0; i<vnnmap->size; i++) {
828                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
829         }
830
831         printf("Recovery mode:%s (%d)\n",
832                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
833                recmode);
834         printf("Recovery master:%d\n", recmaster);
835 }
836
837 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
838                           int argc, const char **argv)
839 {
840         struct ctdb_node_map *nodemap;
841         struct ctdb_vnn_map *vnnmap;
842         int recmode;
843         uint32_t recmaster;
844         int ret;
845
846         if (argc != 0) {
847                 usage("status");
848         }
849
850         nodemap = get_nodemap(ctdb, false);
851         if (nodemap == NULL) {
852                 return 1;
853         }
854
855         if (options.machinereadable == 1) {
856                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
857                 return 0;
858         }
859
860         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
861                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
862         if (ret != 0) {
863                 return ret;
864         }
865
866         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
867                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
868         if (ret != 0) {
869                 return ret;
870         }
871
872         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
873                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
874         if (ret != 0) {
875                 return ret;
876         }
877
878         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
879                      recmode, recmaster);
880         return 0;
881 }
882
883 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
884                           int argc, const char **argv)
885 {
886         struct ctdb_uptime *uptime;
887         int ret, tmp, days, hours, minutes, seconds;
888
889         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
890                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
891         if (ret != 0) {
892                 return ret;
893         }
894
895         printf("Current time of node %-4u     :                %s",
896                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
897
898         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
899         seconds = tmp % 60; tmp /= 60;
900         minutes = tmp % 60; tmp /= 60;
901         hours = tmp % 24; tmp /= 24;
902         days = tmp;
903
904         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
905                days, hours, minutes, seconds,
906                ctime(&uptime->ctdbd_start_time.tv_sec));
907
908         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
909         seconds = tmp % 60; tmp /= 60;
910         minutes = tmp % 60; tmp /= 60;
911         hours = tmp % 24; tmp /= 24;
912         days = tmp;
913
914         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
915                days, hours, minutes, seconds,
916                ctime(&uptime->last_recovery_finished.tv_sec));
917
918         printf("Duration of last recovery/failover: %lf seconds\n",
919                timeval_delta(&uptime->last_recovery_finished,
920                              &uptime->last_recovery_started));
921
922         return 0;
923 }
924
925 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
926                         int argc, const char **argv)
927 {
928         struct timeval tv;
929         int ret, num_clients;
930
931         tv = timeval_current();
932         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
933                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
934         if (ret != 0) {
935                 return ret;
936         }
937
938         printf("response from %u time=%.6f sec  (%d clients)\n",
939                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
940         return 0;
941 }
942
943 const char *runstate_to_string(enum ctdb_runstate runstate);
944 enum ctdb_runstate runstate_from_string(const char *runstate_str);
945
946 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
947                             int argc, const char **argv)
948 {
949         enum ctdb_runstate runstate;
950         bool found;
951         int ret, i;
952
953         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
954                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
955         if (ret != 0) {
956                 return ret;
957         }
958
959         found = true;
960         for (i=0; i<argc; i++) {
961                 enum ctdb_runstate t;
962
963                 found = false;
964                 t = ctdb_runstate_from_string(argv[i]);
965                 if (t == CTDB_RUNSTATE_UNKNOWN) {
966                         printf("Invalid run state (%s)\n", argv[i]);
967                         return 1;
968                 }
969
970                 if (t == runstate) {
971                         found = true;
972                         break;
973                 }
974         }
975
976         if (! found) {
977                 printf("CTDB not in required run state (got %s)\n",
978                        ctdb_runstate_to_string(runstate));
979                 return 1;
980         }
981
982         printf("%s\n", ctdb_runstate_to_string(runstate));
983         return 0;
984 }
985
986 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
987                           int argc, const char **argv)
988 {
989         struct ctdb_var_list *tun_var_list;
990         uint32_t value;
991         int ret, i;
992         bool found;
993
994         if (argc != 1) {
995                 usage("getvar");
996         }
997
998         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
999                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1000         if (ret != 0) {
1001                 fprintf(stderr,
1002                         "Failed to get list of variables from node %u\n",
1003                         ctdb->cmd_pnn);
1004                 return ret;
1005         }
1006
1007         found = false;
1008         for (i=0; i<tun_var_list->count; i++) {
1009                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1010                         found = true;
1011                         break;
1012                 }
1013         }
1014
1015         if (! found) {
1016                 printf("No such tunable %s\n", argv[0]);
1017                 return 1;
1018         }
1019
1020         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1021                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1022         if (ret != 0) {
1023                 return ret;
1024         }
1025
1026         printf("%-26s = %u\n", argv[0], value);
1027         return 0;
1028 }
1029
1030 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1031                           int argc, const char **argv)
1032 {
1033         struct ctdb_var_list *tun_var_list;
1034         struct ctdb_tunable tunable;
1035         int ret, i;
1036         bool found;
1037
1038         if (argc != 2) {
1039                 usage("setvar");
1040         }
1041
1042         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1043                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1044         if (ret != 0) {
1045                 fprintf(stderr,
1046                         "Failed to get list of variables from node %u\n",
1047                         ctdb->cmd_pnn);
1048                 return ret;
1049         }
1050
1051         found = false;
1052         for (i=0; i<tun_var_list->count; i++) {
1053                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1054                         found = true;
1055                         break;
1056                 }
1057         }
1058
1059         if (! found) {
1060                 printf("No such tunable %s\n", argv[0]);
1061                 return 1;
1062         }
1063
1064         tunable.name = argv[0];
1065         tunable.value = strtoul(argv[1], NULL, 0);
1066
1067         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1068                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1069         if (ret != 0) {
1070                 if (ret == 1) {
1071                         fprintf(stderr,
1072                                 "Setting obsolete tunable variable '%s'\n",
1073                                tunable.name);
1074                         return 0;
1075                 }
1076         }
1077
1078         return ret;
1079 }
1080
1081 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1082                             int argc, const char **argv)
1083 {
1084         struct ctdb_var_list *tun_var_list;
1085         int ret, i;
1086
1087         if (argc != 0) {
1088                 usage("listvars");
1089         }
1090
1091         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1092                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1093         if (ret != 0) {
1094                 return ret;
1095         }
1096
1097         for (i=0; i<tun_var_list->count; i++) {
1098                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1099         }
1100
1101         return 0;
1102 }
1103
1104 const struct {
1105         const char *name;
1106         uint32_t offset;
1107 } stats_fields[] = {
1108 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1109         STATISTICS_FIELD(num_clients),
1110         STATISTICS_FIELD(frozen),
1111         STATISTICS_FIELD(recovering),
1112         STATISTICS_FIELD(num_recoveries),
1113         STATISTICS_FIELD(client_packets_sent),
1114         STATISTICS_FIELD(client_packets_recv),
1115         STATISTICS_FIELD(node_packets_sent),
1116         STATISTICS_FIELD(node_packets_recv),
1117         STATISTICS_FIELD(keepalive_packets_sent),
1118         STATISTICS_FIELD(keepalive_packets_recv),
1119         STATISTICS_FIELD(node.req_call),
1120         STATISTICS_FIELD(node.reply_call),
1121         STATISTICS_FIELD(node.req_dmaster),
1122         STATISTICS_FIELD(node.reply_dmaster),
1123         STATISTICS_FIELD(node.reply_error),
1124         STATISTICS_FIELD(node.req_message),
1125         STATISTICS_FIELD(node.req_control),
1126         STATISTICS_FIELD(node.reply_control),
1127         STATISTICS_FIELD(client.req_call),
1128         STATISTICS_FIELD(client.req_message),
1129         STATISTICS_FIELD(client.req_control),
1130         STATISTICS_FIELD(timeouts.call),
1131         STATISTICS_FIELD(timeouts.control),
1132         STATISTICS_FIELD(timeouts.traverse),
1133         STATISTICS_FIELD(locks.num_calls),
1134         STATISTICS_FIELD(locks.num_current),
1135         STATISTICS_FIELD(locks.num_pending),
1136         STATISTICS_FIELD(locks.num_failed),
1137         STATISTICS_FIELD(total_calls),
1138         STATISTICS_FIELD(pending_calls),
1139         STATISTICS_FIELD(childwrite_calls),
1140         STATISTICS_FIELD(pending_childwrite_calls),
1141         STATISTICS_FIELD(memory_used),
1142         STATISTICS_FIELD(max_hop_count),
1143         STATISTICS_FIELD(total_ro_delegations),
1144         STATISTICS_FIELD(total_ro_revokes),
1145 };
1146
1147 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1148
1149 static void print_statistics_machine(struct ctdb_statistics *s,
1150                                      bool show_header)
1151 {
1152         int i;
1153
1154         if (show_header) {
1155                 printf("CTDB version%s", options.sep);
1156                 printf("Current time of statistics%s", options.sep);
1157                 printf("Statistics collected since%s", options.sep);
1158                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1159                         printf("%s%s", stats_fields[i].name, options.sep);
1160                 }
1161                 printf("num_reclock_ctdbd_latency%s", options.sep);
1162                 printf("min_reclock_ctdbd_latency%s", options.sep);
1163                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1164                 printf("max_reclock_ctdbd_latency%s", options.sep);
1165
1166                 printf("num_reclock_recd_latency%s", options.sep);
1167                 printf("min_reclock_recd_latency%s", options.sep);
1168                 printf("avg_reclock_recd_latency%s", options.sep);
1169                 printf("max_reclock_recd_latency%s", options.sep);
1170
1171                 printf("num_call_latency%s", options.sep);
1172                 printf("min_call_latency%s", options.sep);
1173                 printf("avg_call_latency%s", options.sep);
1174                 printf("max_call_latency%s", options.sep);
1175
1176                 printf("num_lockwait_latency%s", options.sep);
1177                 printf("min_lockwait_latency%s", options.sep);
1178                 printf("avg_lockwait_latency%s", options.sep);
1179                 printf("max_lockwait_latency%s", options.sep);
1180
1181                 printf("num_childwrite_latency%s", options.sep);
1182                 printf("min_childwrite_latency%s", options.sep);
1183                 printf("avg_childwrite_latency%s", options.sep);
1184                 printf("max_childwrite_latency%s", options.sep);
1185                 printf("\n");
1186         }
1187
1188         printf("%u%s", CTDB_PROTOCOL, options.sep);
1189         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1191         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1192                 printf("%u%s",
1193                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1194                        options.sep);
1195         }
1196         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1197         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1198         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1199         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1200
1201         printf("%u%s", s->reclock.recd.num, options.sep);
1202         printf("%.6f%s", s->reclock.recd.min, options.sep);
1203         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1204         printf("%.6f%s", s->reclock.recd.max, options.sep);
1205
1206         printf("%d%s", s->call_latency.num, options.sep);
1207         printf("%.6f%s", s->call_latency.min, options.sep);
1208         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1209         printf("%.6f%s", s->call_latency.max, options.sep);
1210
1211         printf("%d%s", s->childwrite_latency.num, options.sep);
1212         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1213         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1214         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1215         printf("\n");
1216 }
1217
1218 static void print_statistics(struct ctdb_statistics *s)
1219 {
1220         int tmp, days, hours, minutes, seconds;
1221         int i;
1222         const char *prefix = NULL;
1223         int preflen = 0;
1224
1225         tmp = s->statistics_current_time.tv_sec -
1226               s->statistics_start_time.tv_sec;
1227         seconds = tmp % 60; tmp /= 60;
1228         minutes = tmp % 60; tmp /= 60;
1229         hours   = tmp % 24; tmp /= 24;
1230         days    = tmp;
1231
1232         printf("CTDB version %u\n", CTDB_PROTOCOL);
1233         printf("Current time of statistics  :                %s",
1234                ctime(&s->statistics_current_time.tv_sec));
1235         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1236                days, hours, minutes, seconds,
1237                ctime(&s->statistics_start_time.tv_sec));
1238
1239         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1240                 if (strchr(stats_fields[i].name, '.') != NULL) {
1241                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1242                         if (! prefix ||
1243                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1244                                 prefix = stats_fields[i].name;
1245                                 printf(" %*.*s\n", preflen-1, preflen-1,
1246                                        stats_fields[i].name);
1247                         }
1248                 } else {
1249                         preflen = 0;
1250                 }
1251                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1252                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1253                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1254         }
1255
1256         printf(" hop_count_buckets:");
1257         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1258                 printf(" %d", s->hop_count_bucket[i]);
1259         }
1260         printf("\n");
1261         printf(" lock_buckets:");
1262         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1263                 printf(" %d", s->locks.buckets[i]);
1264         }
1265         printf("\n");
1266         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1267                "locks_latency      MIN/AVG/MAX",
1268                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1269                s->locks.latency.max, s->locks.latency.num);
1270
1271         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1272                "reclock_ctdbd      MIN/AVG/MAX",
1273                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1274                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1275
1276         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1277                "reclock_recd       MIN/AVG/MAX",
1278                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1279                s->reclock.recd.max, s->reclock.recd.num);
1280
1281         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1282                "call_latency       MIN/AVG/MAX",
1283                s->call_latency.min, LATENCY_AVG(s->call_latency),
1284                s->call_latency.max, s->call_latency.num);
1285
1286         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1287                "childwrite_latency MIN/AVG/MAX",
1288                s->childwrite_latency.min,
1289                LATENCY_AVG(s->childwrite_latency),
1290                s->childwrite_latency.max, s->childwrite_latency.num);
1291 }
1292
1293 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1294                               int argc, const char **argv)
1295 {
1296         struct ctdb_statistics *stats;
1297         int ret;
1298
1299         if (argc != 0) {
1300                 usage("statistics");
1301         }
1302
1303         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1304                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1305         if (ret != 0) {
1306                 return ret;
1307         }
1308
1309         if (options.machinereadable) {
1310                 print_statistics_machine(stats, true);
1311         } else {
1312                 print_statistics(stats);
1313         }
1314
1315         return 0;
1316 }
1317
1318 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1319                                     struct ctdb_context *ctdb,
1320                                     int argc, const char **argv)
1321 {
1322         int ret;
1323
1324         if (argc != 0) {
1325                 usage("statisticsreset");
1326         }
1327
1328         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1329                                          ctdb->cmd_pnn, TIMEOUT());
1330         if (ret != 0) {
1331                 return ret;
1332         }
1333
1334         return 0;
1335 }
1336
1337 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1338                          int argc, const char **argv)
1339 {
1340         struct ctdb_statistics_list *slist;
1341         int ret, count = 0, i;
1342         bool show_header = true;
1343
1344         if (argc > 1) {
1345                 usage("stats");
1346         }
1347
1348         if (argc == 1) {
1349                 count = atoi(argv[0]);
1350         }
1351
1352         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1353                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1354         if (ret != 0) {
1355                 return ret;
1356         }
1357
1358         for (i=0; i<slist->num; i++) {
1359                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1360                         continue;
1361                 }
1362                 if (options.machinereadable == 1) {
1363                         print_statistics_machine(&slist->stats[i],
1364                                                  show_header);
1365                         show_header = false;
1366                 } else {
1367                         print_statistics(&slist->stats[i]);
1368                 }
1369                 if (count > 0 && i == count) {
1370                         break;
1371                 }
1372         }
1373
1374         return 0;
1375 }
1376
1377 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1378                      struct ctdb_public_ip_list *ips,
1379                      struct ctdb_public_ip_info **ipinfo,
1380                      bool all_nodes)
1381 {
1382         int i, j;
1383         char *conf, *avail, *active;
1384
1385         if (options.machinereadable == 1) {
1386                 printf("%s%s%s%s%s", options.sep,
1387                        "Public IP", options.sep,
1388                        "Node", options.sep);
1389                 if (options.verbose == 1) {
1390                         printf("%s%s%s%s%s%s\n",
1391                                "ActiveInterfaces", options.sep,
1392                                "AvailableInterfaces", options.sep,
1393                                "ConfiguredInterfaces", options.sep);
1394                 } else {
1395                         printf("\n");
1396                 }
1397         } else {
1398                 if (all_nodes) {
1399                         printf("Public IPs on ALL nodes\n");
1400                 } else {
1401                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1402                 }
1403         }
1404
1405         /* IPs are reverse sorted */
1406         for (i=ips->num-1; i>=0; i--) {
1407
1408                 if (options.machinereadable == 1) {
1409                         printf("%s%s%s%d%s", options.sep,
1410                                ctdb_sock_addr_to_string(
1411                                         mem_ctx, &ips->ip[i].addr),
1412                                options.sep,
1413                                (int)ips->ip[i].pnn, options.sep);
1414                 } else {
1415                         printf("%s", ctdb_sock_addr_to_string(
1416                                                 mem_ctx, &ips->ip[i].addr));
1417                 }
1418
1419                 if (options.verbose == 0) {
1420                         if (options.machinereadable == 1) {
1421                                 printf("\n");
1422                         } else {
1423                                 printf(" %d\n", (int)ips->ip[i].pnn);
1424                         }
1425                         continue;
1426                 }
1427
1428                 conf = NULL;
1429                 avail = NULL;
1430                 active = NULL;
1431
1432                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1433                         struct ctdb_iface *iface;
1434
1435                         iface = &ipinfo[i]->ifaces->iface[j];
1436                         if (conf == NULL) {
1437                                 conf = talloc_strdup(mem_ctx, iface->name);
1438                         } else {
1439                                 conf = talloc_asprintf_append(
1440                                                 mem_ctx, ",%s", iface->name);
1441                         }
1442
1443                         if (ipinfo[i]->active_idx == j) {
1444                                 active = iface->name;
1445                         }
1446
1447                         if (iface->link_state == 0) {
1448                                 continue;
1449                         }
1450
1451                         if (avail == NULL) {
1452                                 avail = talloc_strdup(mem_ctx, iface->name);
1453                         } else {
1454                                 avail = talloc_asprintf_append(
1455                                                 mem_ctx, ",%s", iface->name);
1456                         }
1457                 }
1458
1459                 if (options.machinereadable == 1) {
1460                         printf("%s%s%s%s%s%s\n",
1461                                active ? active : "", options.sep,
1462                                avail ? avail : "", options.sep,
1463                                conf ? conf : "", options.sep);
1464                 } else {
1465                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1466                                ips->ip[i].pnn, active ? active : "",
1467                                avail ? avail : "", conf ? conf : "");
1468                 }
1469         }
1470 }
1471
1472 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1473                        size_t datalen, void *private_data)
1474 {
1475         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1476                 private_data, struct ctdb_public_ip_list);
1477         struct ctdb_public_ip *ip;
1478
1479         ip = (struct ctdb_public_ip *)databuf;
1480         ips->ip[ips->num] = *ip;
1481         ips->num += 1;
1482
1483         return 0;
1484 }
1485
1486 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1487                               struct ctdb_public_ip_list **out)
1488 {
1489         struct ctdb_node_map *nodemap;
1490         struct ctdb_public_ip_list *ips;
1491         struct db_hash_context *ipdb;
1492         uint32_t *pnn_list;
1493         int ret, count, i, j;
1494
1495         nodemap = get_nodemap(ctdb, false);
1496         if (nodemap == NULL) {
1497                 return 1;
1498         }
1499
1500         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1501         if (ret != 0) {
1502                 goto failed;
1503         }
1504
1505         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1506                                      &pnn_list);
1507         if (count <= 0) {
1508                 goto failed;
1509         }
1510
1511         for (i=0; i<count; i++) {
1512                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1513                                                pnn_list[i], TIMEOUT(), &ips);
1514                 if (ret != 0) {
1515                         goto failed;
1516                 }
1517
1518                 for (j=0; j<ips->num; j++) {
1519                         struct ctdb_public_ip ip;
1520
1521                         ip.pnn = ips->ip[j].pnn;
1522                         ip.addr = ips->ip[j].addr;
1523
1524                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1525                                           sizeof(ip.addr),
1526                                           (uint8_t *)&ip, sizeof(ip));
1527                         if (ret != 0) {
1528                                 goto failed;
1529                         }
1530                 }
1531
1532                 TALLOC_FREE(ips);
1533         }
1534
1535         talloc_free(pnn_list);
1536
1537         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1538         if (ret != 0) {
1539                 goto failed;
1540         }
1541
1542         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1543         if (ips == NULL) {
1544                 goto failed;
1545         }
1546
1547         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1548         if (ips->ip == NULL) {
1549                 goto failed;
1550         }
1551
1552         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         if (count != ips->num) {
1558                 goto failed;
1559         }
1560
1561         talloc_free(ipdb);
1562
1563         *out = ips;
1564         return 0;
1565
1566 failed:
1567         talloc_free(ipdb);
1568         return 1;
1569 }
1570
1571 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1572                       int argc, const char **argv)
1573 {
1574         struct ctdb_public_ip_list *ips;
1575         struct ctdb_public_ip_info **ipinfo;
1576         int ret, i;
1577         bool do_all = false;
1578
1579         if (argc > 1) {
1580                 usage("ip");
1581         }
1582
1583         if (argc == 1) {
1584                 if (strcmp(argv[0], "all") == 0) {
1585                         do_all = true;
1586                 } else {
1587                         usage("ip");
1588                 }
1589         }
1590
1591         if (do_all) {
1592                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1593         } else {
1594                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1595                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1596         }
1597         if (ret != 0) {
1598                 return ret;
1599         }
1600
1601         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1602         if (ipinfo == NULL) {
1603                 return 1;
1604         }
1605
1606         for (i=0; i<ips->num; i++) {
1607                 uint32_t pnn;
1608                 if (do_all) {
1609                         pnn = ips->ip[i].pnn;
1610                 } else {
1611                         pnn = ctdb->cmd_pnn;
1612                 }
1613                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1614                                                    ctdb->client, pnn,
1615                                                    TIMEOUT(), &ips->ip[i].addr,
1616                                                    &ipinfo[i]);
1617                 if (ret != 0) {
1618                         return ret;
1619                 }
1620         }
1621
1622         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1623         return 0;
1624 }
1625
1626 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1627                           int argc, const char **argv)
1628 {
1629         struct ctdb_public_ip_info *ipinfo;
1630         ctdb_sock_addr addr;
1631         int ret, i;
1632
1633         if (argc != 1) {
1634                 usage("ipinfo");
1635         }
1636
1637         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1638                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1639                 return 1;
1640         }
1641
1642         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1643                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1644                                            &ipinfo);
1645         if (ret != 0) {
1646                 if (ret == -1) {
1647                         printf("Node %u does not know about IP %s\n",
1648                                ctdb->cmd_pnn, argv[0]);
1649                 }
1650                 return ret;
1651         }
1652
1653         printf("Public IP[%s] info on node %u\n",
1654                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1655                                         ctdb->cmd_pnn);
1656
1657         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1658                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1659                ipinfo->ip.pnn, ipinfo->ifaces->num);
1660
1661         for (i=0; i<ipinfo->ifaces->num; i++) {
1662                 struct ctdb_iface *iface;
1663
1664                 iface = &ipinfo->ifaces->iface[i];
1665                 iface->name[CTDB_IFACE_SIZE] = '\0';
1666                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1667                        i+1, iface->name,
1668                        iface->link_state == 0 ? "down" : "up",
1669                        iface->references,
1670                        (i == ipinfo->active_idx) ? " (active)" : "");
1671         }
1672
1673         return 0;
1674 }
1675
1676 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1677                           int argc, const char **argv)
1678 {
1679         struct ctdb_iface_list *ifaces;
1680         int ret, i;
1681
1682         if (argc != 0) {
1683                 usage("ifaces");
1684         }
1685
1686         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1687                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1688         if (ret != 0) {
1689                 return ret;
1690         }
1691
1692         if (ifaces->num == 0) {
1693                 printf("No interfaces configured on node %u\n",
1694                        ctdb->cmd_pnn);
1695                 return 0;
1696         }
1697
1698         if (options.machinereadable) {
1699                 printf("%s%s%s%s%s%s%s\n", options.sep,
1700                        "Name", options.sep,
1701                        "LinkStatus", options.sep,
1702                        "References", options.sep);
1703         } else {
1704                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1705         }
1706
1707         for (i=0; i<ifaces->num; i++) {
1708                 if (options.machinereadable) {
1709                         printf("%s%s%s%u%s%u%s\n", options.sep,
1710                                ifaces->iface[i].name, options.sep,
1711                                ifaces->iface[i].link_state, options.sep,
1712                                ifaces->iface[i].references, options.sep);
1713                 } else {
1714                         printf("name:%s link:%s references:%u\n",
1715                                ifaces->iface[i].name,
1716                                ifaces->iface[i].link_state ? "up" : "down",
1717                                ifaces->iface[i].references);
1718                 }
1719         }
1720
1721         return 0;
1722 }
1723
1724 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1725                                 int argc, const char **argv)
1726 {
1727         struct ctdb_iface_list *ifaces;
1728         struct ctdb_iface *iface;
1729         int ret, i;
1730
1731         if (argc != 2) {
1732                 usage("setifacelink");
1733         }
1734
1735         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1736                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1737                 return 1;
1738         }
1739
1740         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1741                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1742         if (ret != 0) {
1743                 fprintf(stderr,
1744                         "Failed to get interface information from node %u\n",
1745                         ctdb->cmd_pnn);
1746                 return ret;
1747         }
1748
1749         iface = NULL;
1750         for (i=0; i<ifaces->num; i++) {
1751                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1752                         iface = &ifaces->iface[i];
1753                         break;
1754                 }
1755         }
1756
1757         if (iface == NULL) {
1758                 printf("Interface %s not configured on node %u\n",
1759                        argv[0], ctdb->cmd_pnn);
1760                 return 1;
1761         }
1762
1763         if (strcmp(argv[1], "up") == 0) {
1764                 iface->link_state = 1;
1765         } else if (strcmp(argv[1], "down") == 0) {
1766                 iface->link_state = 0;
1767         } else {
1768                 usage("setifacelink");
1769                 return 1;
1770         }
1771
1772         iface->references = 0;
1773
1774         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1775                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1776         if (ret != 0) {
1777                 return ret;
1778         }
1779
1780         return 0;
1781 }
1782
1783 static int control_process_exists(TALLOC_CTX *mem_ctx,
1784                                   struct ctdb_context *ctdb,
1785                                   int argc, const char **argv)
1786 {
1787         pid_t pid;
1788         int ret, status;
1789
1790         if (argc != 1) {
1791                 usage("process-exists");
1792         }
1793
1794         pid = atoi(argv[0]);
1795         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1796                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1797         if (ret != 0) {
1798                 return ret;
1799         }
1800
1801         if (status == 0) {
1802                 printf("PID %u exists\n", pid);
1803         } else {
1804                 printf("PID %u does not exist\n", pid);
1805         }
1806         return status;
1807 }
1808
1809 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                             int argc, const char **argv)
1811 {
1812         struct ctdb_dbid_map *dbmap;
1813         int ret, i;
1814
1815         if (argc != 0) {
1816                 usage("getdbmap");
1817         }
1818
1819         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1820                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1821         if (ret != 0) {
1822                 return ret;
1823         }
1824
1825         if (options.machinereadable == 1) {
1826                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1827                        options.sep,
1828                        "ID", options.sep,
1829                        "Name", options.sep,
1830                        "Path", options.sep,
1831                        "Persistent", options.sep,
1832                        "Sticky", options.sep,
1833                        "Unhealthy", options.sep,
1834                        "Readonly", options.sep);
1835         } else {
1836                 printf("Number of databases:%d\n", dbmap->num);
1837         }
1838
1839         for (i=0; i<dbmap->num; i++) {
1840                 const char *name;
1841                 const char *path;
1842                 const char *health;
1843                 bool persistent;
1844                 bool readonly;
1845                 bool sticky;
1846                 uint32_t db_id;
1847
1848                 db_id = dbmap->dbs[i].db_id;
1849
1850                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1851                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1852                                            &name);
1853                 if (ret != 0) {
1854                         return ret;
1855                 }
1856
1857                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1858                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1859                                           &path);
1860                 if (ret != 0) {
1861                         return ret;
1862                 }
1863
1864                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1865                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1866                                               &health);
1867                 if (ret != 0) {
1868                         return ret;
1869                 }
1870
1871                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1872                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1873                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1874
1875                 if (options.machinereadable == 1) {
1876                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1877                                options.sep,
1878                                db_id, options.sep,
1879                                name, options.sep,
1880                                path, options.sep,
1881                                !! (persistent), options.sep,
1882                                !! (sticky), options.sep,
1883                                !! (health), options.sep,
1884                                !! (readonly), options.sep);
1885                 } else {
1886                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1887                                db_id, name, path,
1888                                persistent ? " PERSISTENT" : "",
1889                                sticky ? " STICKY" : "",
1890                                readonly ? " READONLY" : "",
1891                                health ? " UNHEALTHY" : "");
1892                 }
1893
1894                 talloc_free(discard_const(name));
1895                 talloc_free(discard_const(path));
1896                 talloc_free(discard_const(health));
1897         }
1898
1899         return 0;
1900 }
1901
1902 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1903                                int argc, const char **argv)
1904 {
1905         uint32_t db_id;
1906         const char *db_name, *db_path, *db_health;
1907         uint8_t db_flags;
1908         int ret;
1909
1910         if (argc != 1) {
1911                 usage("getdbstatus");
1912         }
1913
1914         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1915                 return 1;
1916         }
1917
1918         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1919                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1920                                   &db_path);
1921         if (ret != 0) {
1922                 return ret;
1923         }
1924
1925         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1926                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1927                                       &db_health);
1928         if (ret != 0) {
1929                 return ret;
1930         }
1931
1932         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1933         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1934                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1935                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1936                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1937                (db_health ? db_health : "OK"));
1938         return 0;
1939 }
1940
1941 struct dump_record_state {
1942         uint32_t count;
1943 };
1944
1945 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1946
1947 static void dump_tdb_data(const char *name, TDB_DATA val)
1948 {
1949         int i;
1950
1951         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1952         for (i=0; i<val.dsize; i++) {
1953                 if (ISASCII(val.dptr[i])) {
1954                         fprintf(stdout, "%c", val.dptr[i]);
1955                 } else {
1956                         fprintf(stdout, "\\%02X", val.dptr[i]);
1957                 }
1958         }
1959         fprintf(stdout, "\"\n");
1960 }
1961
1962 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1963 {
1964         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1965         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1966         fprintf(stdout, "flags: 0x%08x", header->flags);
1967         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1968                 fprintf(stdout, " MIGRATED_WITH_DATA");
1969         }
1970         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1971                 fprintf(stdout, " VACUUM_MIGRATED");
1972         }
1973         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1974                 fprintf(stdout, " AUTOMATIC");
1975         }
1976         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1977                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1978         }
1979         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
1980                 fprintf(stdout, " RO_HAVE_READONLY");
1981         }
1982         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
1983                 fprintf(stdout, " RO_REVOKING_READONLY");
1984         }
1985         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
1986                 fprintf(stdout, " RO_REVOKE_COMPLETE");
1987         }
1988         fprintf(stdout, "\n");
1989
1990 }
1991
1992 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
1993                        TDB_DATA key, TDB_DATA data, void *private_data)
1994 {
1995         struct dump_record_state *state =
1996                 (struct dump_record_state *)private_data;
1997
1998         state->count += 1;
1999
2000         dump_tdb_data("key", key);
2001         dump_ltdb_header(header);
2002         dump_tdb_data("data", data);
2003         fprintf(stdout, "\n");
2004
2005         return 0;
2006 }
2007
2008 struct traverse_state {
2009         TALLOC_CTX *mem_ctx;
2010         bool done;
2011         ctdb_rec_parser_func_t func;
2012         struct dump_record_state sub_state;
2013 };
2014
2015 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2016 {
2017         struct traverse_state *state = (struct traverse_state *)private_data;
2018         struct ctdb_rec_data *rec;
2019         struct ctdb_ltdb_header header;
2020         int ret;
2021
2022         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2023         if (ret != 0) {
2024                 return;
2025         }
2026
2027         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2028                 talloc_free(rec);
2029                 /* end of traverse */
2030                 state->done = true;
2031                 return;
2032         }
2033
2034         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2035         if (ret != 0) {
2036                 talloc_free(rec);
2037                 return;
2038         }
2039
2040         if (rec->data.dsize == 0) {
2041                 return;
2042         }
2043
2044         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2045                           &state->sub_state);
2046         talloc_free(rec);
2047         if (ret != 0) {
2048                 state->done = true;
2049         }
2050 }
2051
2052 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2053                          int argc, const char **argv)
2054 {
2055         struct ctdb_db_context *db;
2056         const char *db_name;
2057         uint32_t db_id;
2058         uint8_t db_flags;
2059         struct ctdb_traverse_start_ext traverse;
2060         struct traverse_state state;
2061         int ret;
2062
2063         if (argc != 1) {
2064                 usage("catdb");
2065         }
2066
2067         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2068                 return 1;
2069         }
2070
2071         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2072                           db_flags, &db);
2073         if (ret != 0) {
2074                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2075                 return ret;
2076         }
2077
2078         /* Valgrind fix */
2079         ZERO_STRUCT(traverse);
2080
2081         traverse.db_id = db_id;
2082         traverse.reqid = 0;
2083         traverse.srvid = next_srvid(ctdb);
2084         traverse.withemptyrecords = false;
2085
2086         state.mem_ctx = mem_ctx;
2087         state.done = false;
2088         state.func = dump_record;
2089         state.sub_state.count = 0;
2090
2091         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2092                                               traverse.srvid,
2093                                               traverse_handler, &state);
2094         if (ret != 0) {
2095                 return ret;
2096         }
2097
2098         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2099                                            ctdb->cmd_pnn, TIMEOUT(),
2100                                            &traverse);
2101         if (ret != 0) {
2102                 return ret;
2103         }
2104
2105         ctdb_client_wait(ctdb->ev, &state.done);
2106
2107         printf("Dumped %u records\n", state.sub_state.count);
2108
2109         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2110                                                  traverse.srvid, &state);
2111         if (ret != 0) {
2112                 return ret;
2113         }
2114
2115         return 0;
2116 }
2117
2118 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2119                           int argc, const char **argv)
2120 {
2121         struct ctdb_db_context *db;
2122         const char *db_name;
2123         uint32_t db_id;
2124         uint8_t db_flags;
2125         struct dump_record_state state;
2126         int ret;
2127
2128         if (argc != 1) {
2129                 usage("catdb");
2130         }
2131
2132         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2133                 return 1;
2134         }
2135
2136         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2137                           db_flags, &db);
2138         if (ret != 0) {
2139                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2140                 return ret;
2141         }
2142
2143         state.count = 0;
2144         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2145
2146         printf("Dumped %u record(s)\n", state.count);
2147
2148         return ret;
2149 }
2150
2151 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2152                               int argc, const char **argv)
2153 {
2154         int mode, ret;
2155
2156         if (argc != 0) {
2157                 usage("getmonmode");
2158         }
2159
2160         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2161                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2162         if (ret != 0) {
2163                 return ret;
2164         }
2165
2166         printf("%s\n",
2167                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2168         return 0;
2169 }
2170
2171 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2172                                    struct ctdb_context *ctdb,
2173                                    int argc, const char **argv)
2174 {
2175         uint32_t caps;
2176         int ret;
2177
2178         if (argc != 0) {
2179                 usage("getcapabilities");
2180         }
2181
2182         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2183                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2184         if (ret != 0) {
2185                 return ret;
2186         }
2187
2188         if (options.machinereadable == 1) {
2189                 printf("%s%s%s%s%s\n",
2190                        options.sep,
2191                        "RECMASTER", options.sep,
2192                        "LMASTER", options.sep);
2193                 printf("%s%d%s%d%s\n", options.sep,
2194                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2195                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2196         } else {
2197                 printf("RECMASTER: %s\n",
2198                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2199                 printf("LMASTER: %s\n",
2200                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2201         }
2202
2203         return 0;
2204 }
2205
2206 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2207                        int argc, const char **argv)
2208 {
2209         printf("%u\n", ctdb_client_pnn(ctdb->client));
2210         return 0;
2211 }
2212
2213 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2214                        int argc, const char **argv)
2215 {
2216         char *t, *lvs_helper = NULL;
2217
2218         if (argc != 1) {
2219                 usage("lvs");
2220         }
2221
2222         t = getenv("CTDB_LVS_HELPER");
2223         if (t != NULL) {
2224                 lvs_helper = talloc_strdup(mem_ctx, t);
2225         } else {
2226                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2227                                              CTDB_HELPER_BINDIR);
2228         }
2229
2230         if (lvs_helper == NULL) {
2231                 fprintf(stderr, "Unable to set LVS helper\n");
2232                 return 1;
2233         }
2234
2235         return run_helper("LVS helper", lvs_helper, argv[0]);
2236 }
2237
2238 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2239                                    struct ctdb_context *ctdb,
2240                                    int argc, const char **argv)
2241 {
2242         int ret;
2243
2244         if (argc != 0) {
2245                 usage("disablemonitor");
2246         }
2247
2248         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2249                                         ctdb->cmd_pnn, TIMEOUT());
2250         if (ret != 0) {
2251                 return ret;
2252         }
2253
2254         return 0;
2255 }
2256
2257 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2258                                   struct ctdb_context *ctdb,
2259                                   int argc, const char **argv)
2260 {
2261         int ret;
2262
2263         if (argc != 0) {
2264                 usage("enablemonitor");
2265         }
2266
2267         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2268                                        ctdb->cmd_pnn, TIMEOUT());
2269         if (ret != 0) {
2270                 return ret;
2271         }
2272
2273         return 0;
2274 }
2275
2276 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2277                             int argc, const char **argv)
2278 {
2279         enum debug_level log_level;
2280         int ret;
2281         bool found;
2282
2283         if (argc != 1) {
2284                 usage("setdebug");
2285         }
2286
2287         found = debug_level_parse(argv[0], &log_level);
2288         if (! found) {
2289                 fprintf(stderr,
2290                         "Invalid debug level '%s'. Valid levels are:\n",
2291                         argv[0]);
2292                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2293                 return 1;
2294         }
2295
2296         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2297                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2298         if (ret != 0) {
2299                 return ret;
2300         }
2301
2302         return 0;
2303 }
2304
2305 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2306                             int argc, const char **argv)
2307 {
2308         enum debug_level loglevel;
2309         const char *log_str;
2310         int ret;
2311
2312         if (argc != 0) {
2313                 usage("getdebug");
2314         }
2315
2316         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2317                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2318         if (ret != 0) {
2319                 return ret;
2320         }
2321
2322         log_str = debug_level_to_string(loglevel);
2323         printf("%s\n", log_str);
2324
2325         return 0;
2326 }
2327
2328 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2329                           int argc, const char **argv)
2330 {
2331         const char *db_name;
2332         uint8_t db_flags = 0;
2333         int ret;
2334
2335         if (argc < 1 || argc > 2) {
2336                 usage("attach");
2337         }
2338
2339         db_name = argv[0];
2340         if (argc == 2) {
2341                 if (strcmp(argv[1], "persistent") == 0) {
2342                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2343                 } else if (strcmp(argv[1], "readonly") == 0) {
2344                         db_flags = CTDB_DB_FLAGS_READONLY;
2345                 } else if (strcmp(argv[1], "sticky") == 0) {
2346                         db_flags = CTDB_DB_FLAGS_STICKY;
2347                 } else {
2348                         usage("attach");
2349                 }
2350         }
2351
2352         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2353                           db_flags, NULL);
2354         if (ret != 0) {
2355                 return ret;
2356         }
2357
2358         return 0;
2359 }
2360
2361 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2362                           int argc, const char **argv)
2363 {
2364         const char *db_name;
2365         uint32_t db_id;
2366         uint8_t db_flags;
2367         struct ctdb_node_map *nodemap;
2368         int recmode;
2369         int ret, ret2, i;
2370
2371         if (argc < 1) {
2372                 usage("detach");
2373         }
2374
2375         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2376                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2377         if (ret != 0) {
2378                 return ret;
2379         }
2380
2381         if (recmode == CTDB_RECOVERY_ACTIVE) {
2382                 fprintf(stderr, "Database cannot be detached"
2383                                 " when recovery is active\n");
2384                 return 1;
2385         }
2386
2387         nodemap = get_nodemap(ctdb, false);
2388         if (nodemap == NULL) {
2389                 return 1;
2390         }
2391
2392         for (i=0; i<nodemap->num; i++) {
2393                 uint32_t value;
2394
2395                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2396                         continue;
2397                 }
2398                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2399                         continue;
2400                 }
2401                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2402                         fprintf(stderr, "Database cannot be detached on"
2403                                 " inactive (stopped or banned) node %u\n",
2404                                 nodemap->node[i].pnn);
2405                         return 1;
2406                 }
2407
2408                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2409                                             nodemap->node[i].pnn, TIMEOUT(),
2410                                             "AllowClientDBAttach", &value);
2411                 if (ret != 0) {
2412                         fprintf(stderr,
2413                                 "Unable to get tunable AllowClientDBAttach"
2414                                 " from node %u\n", nodemap->node[i].pnn);
2415                         return ret;
2416                 }
2417
2418                 if (value == 1) {
2419                         fprintf(stderr,
2420                                 "Database access is still active on node %u."
2421                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2422                                 nodemap->node[i].pnn);
2423                         return 1;
2424                 }
2425         }
2426
2427         ret2 = 0;
2428         for (i=0; i<argc; i++) {
2429                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2430                                 &db_flags)) {
2431                         continue;
2432                 }
2433
2434                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2435                         fprintf(stderr,
2436                                 "Persistent database %s cannot be detached\n",
2437                                 argv[0]);
2438                         return 1;
2439                 }
2440
2441                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2442                                   TIMEOUT(), db_id);
2443                 if (ret != 0) {
2444                         fprintf(stderr, "Database %s detach failed\n", db_name);
2445                         ret2 = ret;
2446                 }
2447         }
2448
2449         return ret2;
2450 }
2451
2452 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2453                               int argc, const char **argv)
2454 {
2455         const char *mem_str;
2456         ssize_t n;
2457         int ret;
2458
2459         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2460                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2461         if (ret != 0) {
2462                 return ret;
2463         }
2464
2465         n = write(1, mem_str, strlen(mem_str)+1);
2466         if (n < 0 || n != strlen(mem_str)+1) {
2467                 fprintf(stderr, "Failed to write talloc summary\n");
2468                 return 1;
2469         }
2470
2471         return 0;
2472 }
2473
2474 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2475 {
2476         bool *done = (bool *)private_data;
2477         ssize_t n;
2478
2479         n = write(1, data.dptr, data.dsize);
2480         if (n < 0 || n != data.dsize) {
2481                 fprintf(stderr, "Failed to write talloc summary\n");
2482         }
2483
2484         *done = true;
2485 }
2486
2487 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2488                                 int argc, const char **argv)
2489 {
2490         struct ctdb_srvid_message msg = { 0 };
2491         int ret;
2492         bool done = false;
2493
2494         msg.pnn = ctdb->pnn;
2495         msg.srvid = next_srvid(ctdb);
2496
2497         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2498                                               msg.srvid, dump_memory, &done);
2499         if (ret != 0) {
2500                 return ret;
2501         }
2502
2503         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2504                                     ctdb->cmd_pnn, &msg);
2505         if (ret != 0) {
2506                 return ret;
2507         }
2508
2509         ctdb_client_wait(ctdb->ev, &done);
2510         return 0;
2511 }
2512
2513 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2514                           int argc, const char **argv)
2515 {
2516         pid_t pid;
2517         int ret;
2518
2519         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2520                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2521         if (ret != 0) {
2522                 return ret;
2523         }
2524
2525         printf("%u\n", pid);
2526         return 0;
2527 }
2528
2529 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2530                        const char *desc, uint32_t flag, bool set_flag)
2531 {
2532         struct ctdb_node_map *nodemap;
2533         bool flag_is_set;
2534
2535         nodemap = get_nodemap(ctdb, false);
2536         if (nodemap == NULL) {
2537                 return 1;
2538         }
2539
2540         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2541         if (set_flag == flag_is_set) {
2542                 if (set_flag) {
2543                         fprintf(stderr, "Node %u is already %s\n",
2544                                 ctdb->cmd_pnn, desc);
2545                 } else {
2546                         fprintf(stderr, "Node %u is not %s\n",
2547                                 ctdb->cmd_pnn, desc);
2548                 }
2549                 return 0;
2550         }
2551
2552         return 1;
2553 }
2554
2555 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2556                            uint32_t flag, bool set_flag)
2557 {
2558         struct ctdb_node_map *nodemap;
2559         bool flag_is_set;
2560
2561         while (1) {
2562                 nodemap = get_nodemap(ctdb, true);
2563                 if (nodemap == NULL) {
2564                         fprintf(stderr,
2565                                 "Failed to get nodemap, trying again\n");
2566                         sleep(1);
2567                         continue;
2568                 }
2569
2570                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2571                 if (flag_is_set == set_flag) {
2572                         break;
2573                 }
2574
2575                 sleep(1);
2576         }
2577 }
2578
2579 struct ipreallocate_state {
2580         int status;
2581         bool done;
2582 };
2583
2584 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2585                                  void *private_data)
2586 {
2587         struct ipreallocate_state *state =
2588                 (struct ipreallocate_state *)private_data;
2589
2590         if (data.dsize != sizeof(int)) {
2591                 /* Ignore packet */
2592                 return;
2593         }
2594
2595         state->status = *(int *)data.dptr;
2596         state->done = true;
2597 }
2598
2599 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2600 {
2601         struct ctdb_srvid_message msg = { 0 };
2602         struct ipreallocate_state state;
2603         int ret;
2604
2605         msg.pnn = ctdb->pnn;
2606         msg.srvid = next_srvid(ctdb);
2607
2608         state.done = false;
2609         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2610                                               msg.srvid,
2611                                               ipreallocate_handler, &state);
2612         if (ret != 0) {
2613                 return ret;
2614         }
2615
2616         while (true) {
2617                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2618                                                 ctdb->client,
2619                                                 CTDB_BROADCAST_CONNECTED,
2620                                                 &msg);
2621                 if (ret != 0) {
2622                         goto fail;
2623                 }
2624
2625                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2626                                                TIMEOUT());
2627                 if (ret != 0) {
2628                         continue;
2629                 }
2630
2631                 if (state.status >= 0) {
2632                         ret = 0;
2633                 } else {
2634                         ret = state.status;
2635                 }
2636                 break;
2637         }
2638
2639 fail:
2640         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2641                                            msg.srvid, &state);
2642         return ret;
2643 }
2644
2645 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2646                            int argc, const char **argv)
2647 {
2648         int ret;
2649
2650         if (argc != 0) {
2651                 usage("disable");
2652         }
2653
2654         ret = check_flags(mem_ctx, ctdb, "disabled",
2655                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2656         if (ret == 0) {
2657                 return 0;
2658         }
2659
2660         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2661                                  ctdb->cmd_pnn, TIMEOUT(),
2662                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2663         if (ret != 0) {
2664                 fprintf(stderr,
2665                         "Failed to set DISABLED flag on node %u\n",
2666                         ctdb->cmd_pnn);
2667                 return ret;
2668         }
2669
2670         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2671         return ipreallocate(mem_ctx, ctdb);
2672 }
2673
2674 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2675                           int argc, const char **argv)
2676 {
2677         int ret;
2678
2679         if (argc != 0) {
2680                 usage("enable");
2681         }
2682
2683         ret = check_flags(mem_ctx, ctdb, "disabled",
2684                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2685         if (ret == 0) {
2686                 return 0;
2687         }
2688
2689         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2690                                  ctdb->cmd_pnn, TIMEOUT(),
2691                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2692         if (ret != 0) {
2693                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2694                         ctdb->cmd_pnn);
2695                 return ret;
2696         }
2697
2698         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2699         return ipreallocate(mem_ctx, ctdb);
2700 }
2701
2702 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2703                         int argc, const char **argv)
2704 {
2705         int ret;
2706
2707         if (argc != 0) {
2708                 usage("stop");
2709         }
2710
2711         ret = check_flags(mem_ctx, ctdb, "stopped",
2712                           NODE_FLAGS_STOPPED, true);
2713         if (ret == 0) {
2714                 return 0;
2715         }
2716
2717         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2718                                   ctdb->cmd_pnn, TIMEOUT());
2719         if (ret != 0) {
2720                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2721                 return ret;
2722         }
2723
2724         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2725         return ipreallocate(mem_ctx, ctdb);
2726 }
2727
2728 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2729                             int argc, const char **argv)
2730 {
2731         int ret;
2732
2733         if (argc != 0) {
2734                 usage("continue");
2735         }
2736
2737         ret = check_flags(mem_ctx, ctdb, "stopped",
2738                           NODE_FLAGS_STOPPED, false);
2739         if (ret == 0) {
2740                 return 0;
2741         }
2742
2743         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2744                                       ctdb->cmd_pnn, TIMEOUT());
2745         if (ret != 0) {
2746                 fprintf(stderr, "Failed to continue stopped node %u\n",
2747                         ctdb->cmd_pnn);
2748                 return ret;
2749         }
2750
2751         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2752         return ipreallocate(mem_ctx, ctdb);
2753 }
2754
2755 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2756                        int argc, const char **argv)
2757 {
2758         struct ctdb_ban_state ban_state;
2759         int ret;
2760
2761         if (argc != 1) {
2762                 usage("ban");
2763         }
2764
2765         ret = check_flags(mem_ctx, ctdb, "banned",
2766                           NODE_FLAGS_BANNED, true);
2767         if (ret == 0) {
2768                 return 0;
2769         }
2770
2771         ban_state.pnn = ctdb->cmd_pnn;
2772         ban_state.time = strtoul(argv[0], NULL, 0);
2773
2774         if (ban_state.time == 0) {
2775                 fprintf(stderr, "Ban time cannot be zero\n");
2776                 return EINVAL;
2777         }
2778
2779         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2780                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2781         if (ret != 0) {
2782                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2783                 return ret;
2784         }
2785
2786         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2787         return ipreallocate(mem_ctx, ctdb);
2788
2789 }
2790
2791 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2792                          int argc, const char **argv)
2793 {
2794         struct ctdb_ban_state ban_state;
2795         int ret;
2796
2797         if (argc != 0) {
2798                 usage("unban");
2799         }
2800
2801         ret = check_flags(mem_ctx, ctdb, "banned",
2802                           NODE_FLAGS_BANNED, false);
2803         if (ret == 0) {
2804                 return 0;
2805         }
2806
2807         ban_state.pnn = ctdb->cmd_pnn;
2808         ban_state.time = 0;
2809
2810         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2811                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2812         if (ret != 0) {
2813                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2814                 return ret;
2815         }
2816
2817         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2818         return ipreallocate(mem_ctx, ctdb);
2819
2820 }
2821
2822 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2823                             int argc, const char **argv)
2824 {
2825         int ret;
2826
2827         if (argc != 0) {
2828                 usage("shutdown");
2829         }
2830
2831         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2832                                  ctdb->cmd_pnn, TIMEOUT());
2833         if (ret != 0) {
2834                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2835                 return ret;
2836         }
2837
2838         return 0;
2839 }
2840
2841 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2842                           uint32_t *generation)
2843 {
2844         uint32_t recmaster;
2845         int recmode;
2846         struct ctdb_vnn_map *vnnmap;
2847         int ret;
2848
2849 again:
2850         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2851                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2852         if (ret != 0) {
2853                 fprintf(stderr, "Failed to find recovery master\n");
2854                 return ret;
2855         }
2856
2857         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2858                                     recmaster, TIMEOUT(), &recmode);
2859         if (ret != 0) {
2860                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2861                         recmaster);
2862                 return ret;
2863         }
2864
2865         if (recmode == CTDB_RECOVERY_ACTIVE) {
2866                 sleep(1);
2867                 goto again;
2868         }
2869
2870         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2871                                   recmaster, TIMEOUT(), &vnnmap);
2872         if (ret != 0) {
2873                 fprintf(stderr, "Failed to get generation from node %u\n",
2874                         recmaster);
2875                 return ret;
2876         }
2877
2878         if (vnnmap->generation == 1) {
2879                 talloc_free(vnnmap);
2880                 sleep(1);
2881                 goto again;
2882         }
2883
2884         *generation = vnnmap->generation;
2885         talloc_free(vnnmap);
2886         return 0;
2887 }
2888
2889
2890 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2891                            int argc, const char **argv)
2892 {
2893         uint32_t generation, next_generation;
2894         int ret;
2895
2896         if (argc != 0) {
2897                 usage("recover");
2898         }
2899
2900         ret = get_generation(mem_ctx, ctdb, &generation);
2901         if (ret != 0) {
2902                 return ret;
2903         }
2904
2905         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2906                                     ctdb->cmd_pnn, TIMEOUT(),
2907                                     CTDB_RECOVERY_ACTIVE);
2908         if (ret != 0) {
2909                 fprintf(stderr, "Failed to set recovery mode active\n");
2910                 return ret;
2911         }
2912
2913         while (1) {
2914                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2915                 if (ret != 0) {
2916                         fprintf(stderr,
2917                                 "Failed to confirm end of recovery\n");
2918                         return ret;
2919                 }
2920
2921                 if (next_generation != generation) {
2922                         break;
2923                 }
2924
2925                 sleep (1);
2926         }
2927
2928         return 0;
2929 }
2930
2931 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2932                                 int argc, const char **argv)
2933 {
2934         if (argc != 0) {
2935                 usage("ipreallocate");
2936         }
2937
2938         return ipreallocate(mem_ctx, ctdb);
2939 }
2940
2941 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2942                                   struct ctdb_context *ctdb,
2943                                   int argc, const char **argv)
2944 {
2945         uint32_t recmaster;
2946         int ret;
2947
2948         if (argc != 0) {
2949                 usage("isnotrecmaster");
2950         }
2951
2952         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2953                                       ctdb->pnn, TIMEOUT(), &recmaster);
2954         if (ret != 0) {
2955                 fprintf(stderr, "Failed to get recmaster\n");
2956                 return ret;
2957         }
2958
2959         if (recmaster != ctdb->pnn) {
2960                 printf("this node is not the recmaster\n");
2961                 return 1;
2962         }
2963
2964         printf("this node is the recmaster\n");
2965         return 0;
2966 }
2967
2968 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2969                            int argc, const char **argv)
2970 {
2971         struct ctdb_addr_info addr_info;
2972         int ret;
2973
2974         if (argc != 2) {
2975                 usage("gratarp");
2976         }
2977
2978         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2979                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2980                 return 1;
2981         }
2982         addr_info.iface = argv[1];
2983
2984         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2985                                             ctdb->cmd_pnn, TIMEOUT(),
2986                                             &addr_info);
2987         if (ret != 0) {
2988                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2989                         ctdb->cmd_pnn);
2990                 return ret;
2991         }
2992
2993         return 0;
2994 }
2995
2996 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2997                            int argc, const char **argv)
2998 {
2999         ctdb_sock_addr src, dst;
3000         int ret;
3001
3002         if (argc != 0 && argc != 2) {
3003                 usage("tickle");
3004         }
3005
3006         if (argc == 0) {
3007                 struct ctdb_connection *clist;
3008                 int count;
3009                 int i, num_failed;
3010
3011                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3012                 if (ret != 0) {
3013                         return ret;
3014                 }
3015
3016                 num_failed = 0;
3017                 for (i=0; i<count; i++) {
3018                         ret = ctdb_sys_send_tcp(&clist[i].src,
3019                                                 &clist[i].dst,
3020                                                 0, 0, 0);
3021                         if (ret != 0) {
3022                                 num_failed += 1;
3023                         }
3024                 }
3025
3026                 if (num_failed > 0) {
3027                         fprintf(stderr, "Failed to send %d tickles\n",
3028                                 num_failed);
3029                         return 1;
3030                 }
3031
3032                 return 0;
3033         }
3034
3035
3036         if (! parse_ip_port(argv[0], &src)) {
3037                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3038                 return 1;
3039         }
3040
3041         if (! parse_ip_port(argv[1], &dst)) {
3042                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3043                 return 1;
3044         }
3045
3046         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3047         if (ret != 0) {
3048                 fprintf(stderr, "Failed to send tickle ack\n");
3049                 return ret;
3050         }
3051
3052         return 0;
3053 }
3054
3055 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3056                               int argc, const char **argv)
3057 {
3058         ctdb_sock_addr addr;
3059         struct ctdb_tickle_list *tickles;
3060         unsigned port = 0;
3061         int ret, i;
3062
3063         if (argc < 1 || argc > 2) {
3064                 usage("gettickles");
3065         }
3066
3067         if (argc == 2) {
3068                 port = strtoul(argv[1], NULL, 10);
3069         }
3070
3071         if (! parse_ip(argv[0], NULL, port, &addr)) {
3072                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3073                 return 1;
3074         }
3075
3076         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3077                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3078                                             &tickles);
3079         if (ret != 0) {
3080                 fprintf(stderr, "Failed to get list of connections\n");
3081                 return ret;
3082         }
3083
3084         if (options.machinereadable) {
3085                 printf("%s%s%s%s%s%s%s%s%s\n",
3086                        options.sep,
3087                        "Source IP", options.sep,
3088                        "Port", options.sep,
3089                        "Destiation IP", options.sep,
3090                        "Port", options.sep);
3091                 for (i=0; i<tickles->num; i++) {
3092                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3093                                ctdb_sock_addr_to_string(
3094                                        mem_ctx, &tickles->conn[i].src),
3095                                options.sep,
3096                                ntohs(tickles->conn[i].src.ip.sin_port),
3097                                options.sep,
3098                                ctdb_sock_addr_to_string(
3099                                        mem_ctx, &tickles->conn[i].dst),
3100                                options.sep,
3101                                ntohs(tickles->conn[i].dst.ip.sin_port),
3102                                options.sep);
3103                 }
3104         } else {
3105                 printf("Connections for IP: %s\n",
3106                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3107                 printf("Num connections: %u\n", tickles->num);
3108                 for (i=0; i<tickles->num; i++) {
3109                         printf("SRC: %s:%u   DST: %s:%u\n",
3110                                ctdb_sock_addr_to_string(
3111                                        mem_ctx, &tickles->conn[i].src),
3112                                ntohs(tickles->conn[i].src.ip.sin_port),
3113                                ctdb_sock_addr_to_string(
3114                                        mem_ctx, &tickles->conn[i].dst),
3115                                ntohs(tickles->conn[i].dst.ip.sin_port));
3116                 }
3117         }
3118
3119         return 0;
3120 }
3121
3122 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3123                                    struct ctdb_connection *conn);
3124
3125 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3126
3127 struct process_clist_state {
3128         struct ctdb_connection *clist;
3129         int count;
3130         int num_failed, num_total;
3131         clist_reply_func reply_func;
3132 };
3133
3134 static void process_clist_done(struct tevent_req *subreq);
3135
3136 static struct tevent_req *process_clist_send(
3137                                         TALLOC_CTX *mem_ctx,
3138                                         struct ctdb_context *ctdb,
3139                                         struct ctdb_connection *clist,
3140                                         int count,
3141                                         clist_request_func request_func,
3142                                         clist_reply_func reply_func)
3143 {
3144         struct tevent_req *req, *subreq;
3145         struct process_clist_state *state;
3146         struct ctdb_req_control request;
3147         int i;
3148
3149         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3150         if (req == NULL) {
3151                 return NULL;
3152         }
3153
3154         state->clist = clist;
3155         state->count = count;
3156         state->reply_func = reply_func;
3157
3158         for (i=0; i<count; i++) {
3159                 request_func(&request, &clist[i]);
3160                 subreq = ctdb_client_control_send(state, ctdb->ev,
3161                                                   ctdb->client, ctdb->cmd_pnn,
3162                                                   TIMEOUT(), &request);
3163                 if (tevent_req_nomem(subreq, req)) {
3164                         return tevent_req_post(req, ctdb->ev);
3165                 }
3166                 tevent_req_set_callback(subreq, process_clist_done, req);
3167         }
3168
3169         return req;
3170 }
3171
3172 static void process_clist_done(struct tevent_req *subreq)
3173 {
3174         struct tevent_req *req = tevent_req_callback_data(
3175                 subreq, struct tevent_req);
3176         struct process_clist_state *state = tevent_req_data(
3177                 req, struct process_clist_state);
3178         struct ctdb_reply_control *reply;
3179         int ret;
3180         bool status;
3181
3182         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3183         TALLOC_FREE(subreq);
3184         if (! status) {
3185                 state->num_failed += 1;
3186                 goto done;
3187         }
3188
3189         ret = state->reply_func(reply);
3190         if (ret != 0) {
3191                 state->num_failed += 1;
3192                 goto done;
3193         }
3194
3195 done:
3196         state->num_total += 1;
3197         if (state->num_total == state->count) {
3198                 tevent_req_done(req);
3199         }
3200 }
3201
3202 static int process_clist_recv(struct tevent_req *req)
3203 {
3204         struct process_clist_state *state = tevent_req_data(
3205                 req, struct process_clist_state);
3206
3207         return state->num_failed;
3208 }
3209
3210 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3211                              int argc, const char **argv)
3212 {
3213         struct ctdb_connection conn;
3214         int ret;
3215
3216         if (argc != 0 && argc != 2) {
3217                 usage("addtickle");
3218         }
3219
3220         if (argc == 0) {
3221                 struct ctdb_connection *clist;
3222                 struct tevent_req *req;
3223                 int count;
3224
3225                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3226                 if (ret != 0) {
3227                         return ret;
3228                 }
3229
3230                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3231                                  ctdb_req_control_tcp_add_delayed_update,
3232                                  ctdb_reply_control_tcp_add_delayed_update);
3233                 if (req == NULL) {
3234                         return ENOMEM;
3235                 }
3236
3237                 tevent_req_poll(req, ctdb->ev);
3238
3239                 ret = process_clist_recv(req);
3240                 if (ret != 0) {
3241                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3242                         return 1;
3243                 }
3244
3245                 return 0;
3246         }
3247
3248         if (! parse_ip_port(argv[0], &conn.src)) {
3249                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3250                 return 1;
3251         }
3252         if (! parse_ip_port(argv[1], &conn.dst)) {
3253                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3254                 return 1;
3255         }
3256
3257         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3258                                                ctdb->client, ctdb->cmd_pnn,
3259                                                TIMEOUT(), &conn);
3260         if (ret != 0) {
3261                 fprintf(stderr, "Failed to register connection\n");
3262                 return ret;
3263         }
3264
3265         return 0;
3266 }
3267
3268 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3269                              int argc, const char **argv)
3270 {
3271         struct ctdb_connection conn;
3272         int ret;
3273
3274         if (argc != 0 && argc != 2) {
3275                 usage("deltickle");
3276         }
3277
3278         if (argc == 0) {
3279                 struct ctdb_connection *clist;
3280                 struct tevent_req *req;
3281                 int count;
3282
3283                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3284                 if (ret != 0) {
3285                         return ret;
3286                 }
3287
3288                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3289                                          ctdb_req_control_tcp_remove,
3290                                          ctdb_reply_control_tcp_remove);
3291                 if (req == NULL) {
3292                         return ENOMEM;
3293                 }
3294
3295                 tevent_req_poll(req, ctdb->ev);
3296
3297                 ret = process_clist_recv(req);
3298                 if (ret != 0) {
3299                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3300                         return 1;
3301                 }
3302
3303                 return 0;
3304         }
3305
3306         if (! parse_ip_port(argv[0], &conn.src)) {
3307                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3308                 return 1;
3309         }
3310         if (! parse_ip_port(argv[1], &conn.dst)) {
3311                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3312                 return 1;
3313         }
3314
3315         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3316                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3317         if (ret != 0) {
3318                 fprintf(stderr, "Failed to unregister connection\n");
3319                 return ret;
3320         }
3321
3322         return 0;
3323 }
3324
3325 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3326                                 int argc, const char **argv)
3327 {
3328         uint64_t *srvid;
3329         uint8_t *result;
3330         int ret, i;
3331
3332         if (argc == 0) {
3333                 usage("check_srvids");
3334         }
3335
3336         srvid = talloc_array(mem_ctx, uint64_t, argc);
3337         if (srvid == NULL) {
3338                 fprintf(stderr, "Memory allocation error\n");
3339                 return 1;
3340         }
3341
3342         for (i=0; i<argc; i++) {
3343                 srvid[i] = strtoull(argv[i], NULL, 0);
3344         }
3345
3346         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3347                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3348                                      &result);
3349         if (ret != 0) {
3350                 fprintf(stderr, "Failed to check srvids on node %u\n",
3351                         ctdb->cmd_pnn);
3352                 return ret;
3353         }
3354
3355         for (i=0; i<argc; i++) {
3356                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3357                        (result[i] ? "exists" : "does not exist"));
3358         }
3359
3360         return 0;
3361 }
3362
3363 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3364                              int argc, const char **argv)
3365 {
3366         struct ctdb_node_map *nodemap;
3367         int i;
3368
3369         if (argc != 0) {
3370                 usage("listnodes");
3371         }
3372
3373         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3374         if (nodemap == NULL) {
3375                 return 1;
3376         }
3377
3378         for (i=0; i<nodemap->num; i++) {
3379                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3380                         continue;
3381                 }
3382
3383                 if (options.machinereadable) {
3384                         printf("%s%u%s%s%s\n", options.sep,
3385                                nodemap->node[i].pnn, options.sep,
3386                                ctdb_sock_addr_to_string(
3387                                         mem_ctx, &nodemap->node[i].addr),
3388                                options.sep);
3389                 } else {
3390                         printf("%s\n",
3391                                ctdb_sock_addr_to_string(
3392                                         mem_ctx, &nodemap->node[i].addr));
3393                 }
3394         }
3395
3396         return 0;
3397 }
3398
3399 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3400                               struct ctdb_node_map *nodemap2)
3401 {
3402         int i;
3403
3404         if (nodemap1->num != nodemap2->num) {
3405                 return false;
3406         }
3407
3408         for (i=0; i<nodemap1->num; i++) {
3409                 struct ctdb_node_and_flags *n1, *n2;
3410
3411                 n1 = &nodemap1->node[i];
3412                 n2 = &nodemap2->node[i];
3413
3414                 if ((n1->pnn != n2->pnn) ||
3415                     (n1->flags != n2->flags) ||
3416                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3417                         return false;
3418                 }
3419         }
3420
3421         return true;
3422 }
3423
3424 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3425                                    struct ctdb_node_map *nm,
3426                                    struct ctdb_node_map *fnm,
3427                                    bool *reload)
3428 {
3429         int i;
3430         bool check_failed = false;
3431
3432         *reload = false;
3433
3434         for (i=0; i<nm->num; i++) {
3435                 if (i >= fnm->num) {
3436                         fprintf(stderr,
3437                                 "Node %u (%s) missing from nodes file\n",
3438                                 nm->node[i].pnn,
3439                                 ctdb_sock_addr_to_string(
3440                                         mem_ctx, &nm->node[i].addr));
3441                         check_failed = true;
3442                         continue;
3443                 }
3444                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3445                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3446                         /* Node remains deleted */
3447                         continue;
3448                 }
3449
3450                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3451                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3452                         /* Node not newly nor previously deleted */
3453                         if (! ctdb_same_ip(&nm->node[i].addr,
3454                                            &fnm->node[i].addr)) {
3455                                 fprintf(stderr,
3456                                         "Node %u has changed IP address"
3457                                         " (was %s, now %s)\n",
3458                                         nm->node[i].pnn,
3459                                         ctdb_sock_addr_to_string(
3460                                                 mem_ctx, &nm->node[i].addr),
3461                                         ctdb_sock_addr_to_string(
3462                                                 mem_ctx, &fnm->node[i].addr));
3463                                 check_failed = true;
3464                         } else {
3465                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3466                                         fprintf(stderr,
3467                                                 "WARNING: Node %u is disconnected."
3468                                                 " You MUST fix this node manually!\n",
3469                                                 nm->node[i].pnn);
3470                                 }
3471                         }
3472                         continue;
3473                 }
3474
3475                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3476                         /* Node is being deleted */
3477                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3478                         *reload = true;
3479                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3480                                 fprintf(stderr,
3481                                         "ERROR: Node %u is still connected\n",
3482                                         nm->node[i].pnn);
3483                                 check_failed = true;
3484                         }
3485                         continue;
3486                 }
3487
3488                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3489                         /* Node was previously deleted */
3490                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3491                         *reload = true;
3492                 }
3493         }
3494
3495         if (check_failed) {
3496                 fprintf(stderr,
3497                         "ERROR: Nodes will not be reloaded due to previous error\n");
3498                 return 1;
3499         }
3500
3501         /* Leftover nodes in file are NEW */
3502         for (; i < fnm->num; i++) {
3503                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3504                 *reload = true;
3505         }
3506
3507         return 0;
3508 }
3509
3510 struct disable_recoveries_state {
3511         uint32_t *pnn_list;
3512         int node_count;
3513         bool *reply;
3514         int status;
3515         bool done;
3516 };
3517
3518 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3519                                        void *private_data)
3520 {
3521         struct disable_recoveries_state *state =
3522                 (struct disable_recoveries_state *)private_data;
3523         int ret, i;
3524
3525         if (data.dsize != sizeof(int)) {
3526                 /* Ignore packet */
3527                 return;
3528         }
3529
3530         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3531         ret = *(int *)data.dptr;
3532         if (ret < 0) {
3533                 state->status = ret;
3534                 state->done = true;
3535                 return;
3536         }
3537         for (i=0; i<state->node_count; i++) {
3538                 if (state->pnn_list[i] == ret) {
3539                         state->reply[i] = true;
3540                         break;
3541                 }
3542         }
3543
3544         state->done = true;
3545         for (i=0; i<state->node_count; i++) {
3546                 if (! state->reply[i]) {
3547                         state->done = false;
3548                         break;
3549                 }
3550         }
3551 }
3552
3553 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3554                               uint32_t timeout, uint32_t *pnn_list, int count)
3555 {
3556         struct ctdb_disable_message disable = { 0 };
3557         struct disable_recoveries_state state;
3558         int ret, i;
3559
3560         disable.pnn = ctdb->pnn;
3561         disable.srvid = next_srvid(ctdb);
3562         disable.timeout = timeout;
3563
3564         state.pnn_list = pnn_list;
3565         state.node_count = count;
3566         state.done = false;
3567         state.status = 0;
3568         state.reply = talloc_zero_array(mem_ctx, bool, count);
3569         if (state.reply == NULL) {
3570                 return ENOMEM;
3571         }
3572
3573         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3574                                               disable.srvid,
3575                                               disable_recoveries_handler,
3576                                               &state);
3577         if (ret != 0) {
3578                 return ret;
3579         }
3580
3581         for (i=0; i<count; i++) {
3582                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3583                                                       ctdb->client,
3584                                                       pnn_list[i],
3585                                                       &disable);
3586                 if (ret != 0) {
3587                         goto fail;
3588                 }
3589         }
3590
3591         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3592         if (ret == ETIME) {
3593                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3594         } else {
3595                 ret = (state.status >= 0 ? 0 : 1);
3596         }
3597
3598 fail:
3599         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3600                                            disable.srvid, &state);
3601         return ret;
3602 }
3603
3604 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3605                                int argc, const char **argv)
3606 {
3607         struct ctdb_node_map *nodemap = NULL;
3608         struct ctdb_node_map *file_nodemap;
3609         struct ctdb_node_map *remote_nodemap;
3610         struct ctdb_req_control request;
3611         struct ctdb_reply_control **reply;
3612         bool reload;
3613         int ret, i;
3614         uint32_t *pnn_list;
3615         int count;
3616
3617         nodemap = get_nodemap(ctdb, false);
3618         if (nodemap == NULL) {
3619                 return 1;
3620         }
3621
3622         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3623         if (file_nodemap == NULL) {
3624                 return 1;
3625         }
3626
3627         for (i=0; i<nodemap->num; i++) {
3628                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3629                         continue;
3630                 }
3631
3632                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3633                                                nodemap->node[i].pnn, TIMEOUT(),
3634                                                &remote_nodemap);
3635                 if (ret != 0) {
3636                         fprintf(stderr,
3637                                 "ERROR: Failed to get nodes file from node %u\n",
3638                                 nodemap->node[i].pnn);
3639                         return ret;
3640                 }
3641
3642                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3643                         fprintf(stderr,
3644                                 "ERROR: Nodes file on node %u differs"
3645                                  " from current node (%u)\n",
3646                                  nodemap->node[i].pnn, ctdb->pnn);
3647                         return 1;
3648                 }
3649         }
3650
3651         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3652         if (ret != 0) {
3653                 return ret;
3654         }
3655
3656         if (! reload) {
3657                 fprintf(stderr, "No change in nodes file,"
3658                                 " skipping unnecessary reload\n");
3659                 return 0;
3660         }
3661
3662         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3663                                         mem_ctx, &pnn_list);
3664         if (count <= 0) {
3665                 fprintf(stderr, "Memory allocation error\n");
3666                 return 1;
3667         }
3668
3669         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3670                                  pnn_list, count);
3671         if (ret != 0) {
3672                 fprintf(stderr, "Failed to disable recoveries\n");
3673                 return ret;
3674         }
3675
3676         ctdb_req_control_reload_nodes_file(&request);
3677         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3678                                         pnn_list, count, TIMEOUT(),
3679                                         &request, NULL, &reply);
3680         if (ret != 0) {
3681                 bool failed = false;
3682
3683                 for (i=0; i<count; i++) {
3684                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3685                         if (ret != 0) {
3686                                 fprintf(stderr,
3687                                         "Node %u failed to reload nodes\n",
3688                                         pnn_list[i]);
3689                                 failed = true;
3690                         }
3691                 }
3692                 if (failed) {
3693                         fprintf(stderr,
3694                                 "You MUST fix failed nodes manually!\n");
3695                 }
3696         }
3697
3698         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3699         if (ret != 0) {
3700                 fprintf(stderr, "Failed to enable recoveries\n");
3701                 return ret;
3702         }
3703
3704         return 0;
3705 }
3706
3707 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3708                   ctdb_sock_addr *addr, uint32_t pnn)
3709 {
3710         struct ctdb_public_ip_list *pubip_list;
3711         struct ctdb_public_ip pubip;
3712         struct ctdb_node_map *nodemap;
3713         struct ctdb_req_control request;
3714         uint32_t *pnn_list;
3715         int ret, i, count;
3716
3717         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3718                                             CTDB_BROADCAST_CONNECTED,
3719                                             2*options.timelimit);
3720         if (ret != 0) {
3721                 fprintf(stderr, "Failed to disable IP check\n");
3722                 return ret;
3723         }
3724
3725         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3726                                        pnn, TIMEOUT(), &pubip_list);
3727         if (ret != 0) {
3728                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3729                         pnn);
3730                 return ret;
3731         }
3732
3733         for (i=0; i<pubip_list->num; i++) {
3734                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3735                         break;
3736                 }
3737         }
3738
3739         if (i == pubip_list->num) {
3740                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3741                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3742                 return 1;
3743         }
3744
3745         nodemap = get_nodemap(ctdb, false);
3746         if (nodemap == NULL) {
3747                 return 1;
3748         }
3749
3750         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3751         if (count <= 0) {
3752                 fprintf(stderr, "Memory allocation error\n");
3753                 return 1;
3754         }
3755
3756         pubip.pnn = pnn;
3757         pubip.addr = *addr;
3758         ctdb_req_control_release_ip(&request, &pubip);
3759
3760         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3761                                         pnn_list, count, TIMEOUT(),
3762                                         &request, NULL, NULL);
3763         if (ret != 0) {
3764                 fprintf(stderr, "Failed to release IP on nodes\n");
3765                 return ret;
3766         }
3767
3768         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3769                                     pnn, TIMEOUT(), &pubip);
3770         if (ret != 0) {
3771                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3772                 return ret;
3773         }
3774
3775         return 0;
3776 }
3777
3778 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3779                           int argc, const char **argv)
3780 {
3781         ctdb_sock_addr addr;
3782         uint32_t pnn;
3783         int ret, retries = 0;
3784
3785         if (argc != 2) {
3786                 usage("moveip");
3787         }
3788
3789         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3790                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3791                 return 1;
3792         }
3793
3794         pnn = strtoul(argv[1], NULL, 10);
3795         if (pnn == CTDB_UNKNOWN_PNN) {
3796                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3797                 return 1;
3798         }
3799
3800         while (retries < 5) {
3801                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3802                 if (ret == 0) {
3803                         break;
3804                 }
3805
3806                 sleep(3);
3807                 retries++;
3808         }
3809
3810         if (ret != 0) {
3811                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3812                         argv[0], pnn);
3813                 return ret;
3814         }
3815
3816         return 0;
3817 }
3818
3819 static int control_rebalanceip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3820                                int argc, const char **argv)
3821 {
3822         ctdb_sock_addr addr;
3823         struct ctdb_node_map *nodemap;
3824         struct ctdb_public_ip pubip;
3825         struct ctdb_req_control request;
3826         uint32_t *pnn_list;
3827         int ret, count;
3828
3829         if (argc != 1) {
3830                 usage("rebalanceip");
3831         }
3832
3833         if (parse_ip(argv[0], NULL, 0, &addr)) {
3834                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3835                 return 1;
3836         }
3837
3838         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3839                                             CTDB_BROADCAST_CONNECTED,
3840                                             2*options.timelimit);
3841         if (ret != 0) {
3842                 fprintf(stderr, "Failed to disable IP check\n");
3843                 return 1;
3844         }
3845
3846         nodemap = get_nodemap(ctdb, false);
3847         if (nodemap == NULL) {
3848                 return 1;
3849         }
3850
3851         count = list_of_active_nodes(nodemap, -1, mem_ctx, &pnn_list);
3852         if (count <= 0) {
3853                 fprintf(stderr, "Memory allocation error\n");
3854                 return 1;
3855         }
3856
3857         pubip.pnn = CTDB_UNKNOWN_PNN;
3858         pubip.addr = addr;
3859
3860         ctdb_req_control_release_ip(&request, &pubip);
3861         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3862                                         pnn_list, count, TIMEOUT(),
3863                                         &request, NULL, NULL);
3864         if (ret != 0) {
3865                 fprintf(stderr, "Failed to release IP from nodes\n");
3866                 return 1;
3867         }
3868
3869         return ipreallocate(mem_ctx, ctdb);
3870 }
3871
3872 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3873                          uint32_t pnn)
3874 {
3875         int ret;
3876
3877         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3878                                           CTDB_BROADCAST_CONNECTED, pnn);
3879         if (ret != 0) {
3880                 fprintf(stderr,
3881                         "Failed to ask recovery master to distribute IPs\n");
3882                 return ret;
3883         }
3884
3885         return 0;
3886 }
3887
3888 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3889                          int argc, const char **argv)
3890 {
3891         ctdb_sock_addr addr;
3892         struct ctdb_public_ip_list *pubip_list;
3893         struct ctdb_addr_info addr_info;
3894         unsigned int mask;
3895         int ret, i, retries = 0;
3896
3897         if (argc != 2) {
3898                 usage("addip");
3899         }
3900
3901         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3902                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3903                 return 1;
3904         }
3905
3906         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3907                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3908         if (ret != 0) {
3909                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3910                         ctdb->cmd_pnn);
3911                 return 1;
3912         }
3913
3914         for (i=0; i<pubip_list->num; i++) {
3915                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3916                         fprintf(stderr, "Node already knows about IP %s\n",
3917                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3918                         return 0;
3919                 }
3920         }
3921
3922         addr_info.addr = addr;
3923         addr_info.mask = mask;
3924         addr_info.iface = argv[1];
3925
3926         while (retries < 5) {
3927                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3928                                               ctdb->cmd_pnn, TIMEOUT(),
3929                                               &addr_info);
3930                 if (ret == 0) {
3931                         break;
3932                 }
3933
3934                 sleep(3);
3935                 retries++;
3936         }
3937
3938         if (ret != 0) {
3939                 fprintf(stderr, "Failed to add public IP to node %u."
3940                                 " Giving up\n", ctdb->cmd_pnn);
3941                 return ret;
3942         }
3943
3944         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3945         if (ret != 0) {
3946                 return ret;
3947         }
3948
3949         return ipreallocate(mem_ctx, ctdb);
3950 }
3951
3952 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3953                          int argc, const char **argv)
3954 {
3955         ctdb_sock_addr addr;
3956         struct ctdb_public_ip_list *pubip_list;
3957         struct ctdb_addr_info addr_info;
3958         int ret, i;
3959
3960         if (argc != 1) {
3961                 usage("delip");
3962         }
3963
3964         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3965                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3966                 return 1;
3967         }
3968
3969         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3970                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3971         if (ret != 0) {
3972                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3973                         ctdb->cmd_pnn);
3974                 return 1;
3975         }
3976
3977         for (i=0; i<pubip_list->num; i++) {
3978                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3979                         break;
3980                 }
3981         }
3982
3983         if (i == pubip_list->num) {
3984                 fprintf(stderr, "Node does not know about IP address %s\n",
3985                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3986                 return 0;
3987         }
3988
3989         addr_info.addr = addr;
3990         addr_info.mask = 0;
3991         addr_info.iface = NULL;
3992
3993         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3994                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3995         if (ret != 0) {
3996                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3997                         ctdb->cmd_pnn);
3998                 return ret;
3999         }
4000
4001         return 0;
4002 }
4003
4004 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4005                                int argc, const char **argv)
4006 {
4007         int ret;
4008
4009         if (argc != 1) {
4010                 usage("eventscript");
4011         }
4012
4013         if (strcmp(argv[0], "monitor") != 0) {
4014                 fprintf(stderr, "Only monitor event can be run\n");
4015                 return 1;
4016         }
4017
4018         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
4019                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4020         if (ret != 0) {
4021                 fprintf(stderr, "Failed to run monitor event on node %u\n",
4022                         ctdb->cmd_pnn);
4023                 return ret;
4024         }
4025
4026         return 0;
4027 }
4028
4029 #define DB_VERSION      3
4030 #define MAX_DB_NAME     64
4031 #define MAX_REC_BUFFER_SIZE     (100*1000)
4032
4033 struct db_header {
4034         unsigned long version;
4035         time_t timestamp;
4036         unsigned long flags;
4037         unsigned long nbuf;
4038         unsigned long nrec;
4039         char name[MAX_DB_NAME];
4040 };
4041
4042 struct backup_state {
4043         TALLOC_CTX *mem_ctx;
4044         struct ctdb_rec_buffer *recbuf;
4045         uint32_t db_id;
4046         int fd;
4047         unsigned int nbuf, nrec;
4048 };
4049
4050 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4051                           TDB_DATA key, TDB_DATA data, void *private_data)
4052 {
4053         struct backup_state *state = (struct backup_state *)private_data;
4054         size_t len;
4055         int ret;
4056
4057         if (state->recbuf == NULL) {
4058                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4059                                                      state->db_id);
4060                 if (state->recbuf == NULL) {
4061                         return ENOMEM;
4062                 }
4063         }
4064
4065         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4066                                   header, key, data);
4067         if (ret != 0) {
4068                 return ret;
4069         }
4070
4071         len = ctdb_rec_buffer_len(state->recbuf);
4072         if (len < MAX_REC_BUFFER_SIZE) {
4073                 return 0;
4074         }
4075
4076         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4077         if (ret != 0) {
4078                 fprintf(stderr, "Failed to write records to backup file\n");
4079                 return ret;
4080         }
4081
4082         state->nbuf += 1;
4083         state->nrec += state->recbuf->count;
4084         TALLOC_FREE(state->recbuf);
4085
4086         return 0;
4087 }
4088
4089 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4090                             int argc, const char **argv)
4091 {
4092         const char *db_name;
4093         struct ctdb_db_context *db;
4094         uint32_t db_id;
4095         uint8_t db_flags;
4096         struct backup_state state;
4097         struct db_header db_hdr;
4098         int fd, ret;
4099
4100         if (argc != 2) {
4101                 usage("backupdb");
4102         }
4103
4104         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4105                 return 1;
4106         }
4107
4108         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4109                           db_flags, &db);
4110         if (ret != 0) {
4111                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4112                 return ret;
4113         }
4114
4115         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4116         if (fd == -1) {
4117                 ret = errno;
4118                 fprintf(stderr, "Failed to open file %s for writing\n",
4119                         argv[1]);
4120                 return ret;
4121         }
4122
4123         /* Write empty header first */
4124         ZERO_STRUCT(db_hdr);
4125         ret = write(fd, &db_hdr, sizeof(struct db_header));
4126         if (ret == -1) {
4127                 ret = errno;
4128                 close(fd);
4129                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4130                 return ret;
4131         }
4132
4133         state.mem_ctx = mem_ctx;
4134         state.recbuf = NULL;
4135         state.fd = fd;
4136         state.nbuf = 0;
4137         state.nrec = 0;
4138
4139         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4140         if (ret != 0) {
4141                 fprintf(stderr, "Failed to collect records from DB %s\n",
4142                         db_name);
4143                 close(fd);
4144                 return ret;
4145         }
4146
4147         if (state.recbuf != NULL) {
4148                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4149                 if (ret != 0) {
4150                         fprintf(stderr,
4151                                 "Failed to write records to backup file\n");
4152                         close(fd);
4153                         return ret;
4154                 }
4155
4156                 state.nbuf += 1;
4157                 state.nrec += state.recbuf->count;
4158                 TALLOC_FREE(state.recbuf);
4159         }
4160
4161         db_hdr.version = DB_VERSION;
4162         db_hdr.timestamp = time(NULL);
4163         db_hdr.flags = db_flags;
4164         db_hdr.nbuf = state.nbuf;
4165         db_hdr.nrec = state.nrec;
4166         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4167
4168         lseek(fd, 0, SEEK_SET);
4169         ret = write(fd, &db_hdr, sizeof(struct db_header));
4170         if (ret == -1) {
4171                 ret = errno;
4172                 close(fd);
4173                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4174                 return ret;
4175         }
4176
4177         close(fd);
4178         printf("Database backed up to %s\n", argv[1]);
4179         return 0;
4180 }
4181
4182 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4183                              int argc, const char **argv)
4184 {
4185         const char *db_name = NULL;
4186         struct ctdb_db_context *db;
4187         struct db_header db_hdr;
4188         struct ctdb_node_map *nodemap;
4189         struct ctdb_req_control request;
4190         struct ctdb_reply_control **reply;
4191         struct ctdb_transdb wipedb;
4192         struct ctdb_pulldb_ext pulldb;
4193         struct ctdb_rec_buffer *recbuf;
4194         uint32_t generation;
4195         uint32_t *pnn_list;
4196         char timebuf[128];
4197         int fd, i;
4198         int count, ret;
4199
4200         if (argc < 1 || argc > 2) {
4201                 usage("restoredb");
4202         }
4203
4204         fd = open(argv[0], O_RDONLY, 0600);
4205         if (fd == -1) {
4206                 ret = errno;
4207                 fprintf(stderr, "Failed to open file %s for reading\n",
4208                         argv[0]);
4209                 return ret;
4210         }
4211
4212         if (argc == 2) {
4213                 db_name = argv[1];
4214         }
4215
4216         ret = read(fd, &db_hdr, sizeof(struct db_header));
4217         if (ret == -1) {
4218                 ret = errno;
4219                 close(fd);
4220                 fprintf(stderr, "Failed to read db header from file %s\n",
4221                         argv[0]);
4222                 return ret;
4223         }
4224
4225         if (db_hdr.version != DB_VERSION) {
4226                 fprintf(stderr,
4227                         "Wrong version of backup file, expected %u, got %lu\n",
4228                         DB_VERSION, db_hdr.version);
4229                 close(fd);
4230                 return EINVAL;
4231         }
4232
4233         if (db_name == NULL) {
4234                 db_name = db_hdr.name;
4235         }
4236
4237         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4238                  localtime(&db_hdr.timestamp));
4239         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4240
4241         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4242                           db_hdr.flags, &db);
4243         if (ret != 0) {
4244                 fprintf(stderr, "Failed to attach to DB %s\n", db_hdr.name);
4245                 return ret;
4246         }
4247
4248         nodemap = get_nodemap(ctdb, false);
4249         if (nodemap == NULL) {
4250                 fprintf(stderr, "Failed to get nodemap\n");
4251                 close(fd);
4252                 return ENOMEM;
4253         }
4254
4255         ret = get_generation(mem_ctx, ctdb, &generation);
4256         if (ret != 0) {
4257                 fprintf(stderr, "Failed to get current generation\n");
4258                 close(fd);
4259                 return ret;
4260         }
4261
4262         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4263                                      &pnn_list);
4264         if (count <= 0) {
4265                 close(fd);
4266                 return ENOMEM;
4267         }
4268
4269         wipedb.db_id = ctdb_db_id(db);
4270         wipedb.tid = generation;
4271
4272         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4273         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4274                                         ctdb->client, pnn_list, count,
4275                                         TIMEOUT(), &request, NULL, NULL);
4276         if (ret != 0) {
4277                 goto failed;
4278         }
4279
4280
4281         ctdb_req_control_db_transaction_start(&request, &wipedb);
4282         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4283                                         pnn_list, count, TIMEOUT(),
4284                                         &request, NULL, NULL);
4285         if (ret != 0) {
4286                 goto failed;
4287         }
4288
4289         ctdb_req_control_wipe_database(&request, &wipedb);
4290         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4291                                         pnn_list, count, TIMEOUT(),
4292                                         &request, NULL, NULL);
4293         if (ret != 0) {
4294                 goto failed;
4295         }
4296
4297         pulldb.db_id = ctdb_db_id(db);
4298         pulldb.lmaster = 0;
4299         pulldb.srvid = SRVID_CTDB_PUSHDB;
4300
4301         ctdb_req_control_db_push_start(&request, &pulldb);
4302         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4303                                         pnn_list, count, TIMEOUT(),
4304                                         &request, NULL, NULL);
4305         if (ret != 0) {
4306                 goto failed;
4307         }
4308
4309         for (i=0; i<db_hdr.nbuf; i++) {
4310                 struct ctdb_req_message message;
4311                 TDB_DATA data;
4312
4313                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4314                 if (ret != 0) {
4315                         goto failed;
4316                 }
4317
4318                 data.dsize = ctdb_rec_buffer_len(recbuf);
4319                 data.dptr = talloc_size(mem_ctx, data.dsize);
4320                 if (data.dptr == NULL) {
4321                         goto failed;
4322                 }
4323
4324                 ctdb_rec_buffer_push(recbuf, data.dptr);
4325
4326                 message.srvid = pulldb.srvid;
4327                 message.data.data = data;
4328
4329                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4330                                                 ctdb->client,
4331                                                 pnn_list, count,
4332                                                 &message, NULL);
4333                 if (ret != 0) {
4334                         goto failed;
4335                 }
4336
4337                 talloc_free(recbuf);
4338                 talloc_free(data.dptr);
4339         }
4340
4341         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4342         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4343                                         pnn_list, count, TIMEOUT(),
4344                                         &request, NULL, &reply);
4345         if (ret != 0) {
4346                 goto failed;
4347         }
4348
4349         for (i=0; i<count; i++) {
4350                 uint32_t num_records;
4351
4352                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4353                                                          &num_records);
4354                 if (ret != 0) {
4355                         fprintf(stderr, "Invalid response from node %u\n",
4356                                 pnn_list[i]);
4357                         goto failed;
4358                 }
4359
4360                 if (num_records != db_hdr.nrec) {
4361                         fprintf(stderr, "Node %u received %u of %lu records\n",
4362                                 pnn_list[i], num_records, db_hdr.nrec);
4363                         goto failed;
4364                 }
4365         }
4366
4367         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4368         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4369                                         pnn_list, count, TIMEOUT(),
4370                                         &request, NULL, NULL);
4371         if (ret != 0) {
4372                 goto failed;
4373         }
4374
4375         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4376         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4377                                         pnn_list, count, TIMEOUT(),
4378                                         &request, NULL, NULL);
4379         if (ret != 0) {
4380                 goto failed;
4381         }
4382
4383         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4384         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4385                                         ctdb->client, pnn_list, count,
4386                                         TIMEOUT(), &request, NULL, NULL);
4387         if (ret != 0) {
4388                 goto failed;
4389         }
4390
4391         printf("Database %s restored\n", db_name);
4392         return 0;
4393
4394
4395 failed:
4396         close(fd);
4397         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4398                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4399         return ret;
4400 }
4401
4402 struct dumpdbbackup_state {
4403         ctdb_rec_parser_func_t parser;
4404         struct dump_record_state sub_state;
4405 };
4406
4407 static int dumpdbbackup_handler(uint32_t reqid,
4408                                 struct ctdb_ltdb_header *header,
4409                                 TDB_DATA key, TDB_DATA data,
4410                                 void *private_data)
4411 {
4412         struct dumpdbbackup_state *state =
4413                 (struct dumpdbbackup_state *)private_data;
4414         struct ctdb_ltdb_header hdr;
4415         int ret;
4416
4417         ret = ctdb_ltdb_header_extract(&data, &hdr);
4418         if (ret != 0) {
4419                 return ret;
4420         }
4421
4422         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4423 }
4424
4425 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4426                                 int argc, const char **argv)
4427 {
4428         struct db_header db_hdr;
4429         char timebuf[128];
4430         struct dumpdbbackup_state state;
4431         int fd, ret, i;
4432
4433         if (argc != 1) {
4434                 usage("dumpbackup");
4435         }
4436
4437         fd = open(argv[0], O_RDONLY, 0600);
4438         if (fd == -1) {
4439                 ret = errno;
4440                 fprintf(stderr, "Failed to open file %s for reading\n",
4441                         argv[0]);
4442                 return ret;
4443         }
4444
4445         ret = read(fd, &db_hdr, sizeof(struct db_header));
4446         if (ret == -1) {
4447                 ret = errno;
4448                 close(fd);
4449                 fprintf(stderr, "Failed to read db header from file %s\n",
4450                         argv[0]);
4451                 return ret;
4452         }
4453
4454         if (db_hdr.version != DB_VERSION) {
4455                 fprintf(stderr,
4456                         "Wrong version of backup file, expected %u, got %lu\n",
4457                         DB_VERSION, db_hdr.version);
4458                 return EINVAL;
4459         }
4460
4461         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4462                  localtime(&db_hdr.timestamp));
4463         printf("Dumping database %s from backup @ %s\n",
4464                db_hdr.name, timebuf);
4465
4466         state.parser = dump_record;
4467         state.sub_state.count = 0;
4468
4469         for (i=0; i<db_hdr.nbuf; i++) {
4470                 struct ctdb_rec_buffer *recbuf;
4471
4472                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4473                 if (ret != 0) {
4474                         fprintf(stderr, "Failed to read records\n");
4475                         return ret;
4476                 }
4477
4478                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4479                                                &state);
4480                 if (ret != 0) {
4481                         fprintf(stderr, "Failed to dump records\n");
4482                         return ret;
4483                 }
4484         }
4485
4486         printf("Dumped %u record(s)\n", state.sub_state.count);
4487         return 0;
4488 }
4489
4490 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4491                           int argc, const char **argv)
4492 {
4493         const char *db_name;
4494         struct ctdb_db_context *db;
4495         uint32_t db_id;
4496         uint8_t db_flags;
4497         struct ctdb_node_map *nodemap;
4498         struct ctdb_req_control request;
4499         struct ctdb_transdb wipedb;
4500         uint32_t generation;
4501         uint32_t *pnn_list;
4502         int count, ret;
4503
4504         if (argc != 1) {
4505                 usage("wipedb");
4506         }
4507
4508         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4509                 return 1;
4510         }
4511
4512         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4513                           db_flags, &db);
4514         if (ret != 0) {
4515                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4516                 return ret;
4517         }
4518
4519         nodemap = get_nodemap(ctdb, false);
4520         if (nodemap == NULL) {
4521                 fprintf(stderr, "Failed to get nodemap\n");
4522                 return ENOMEM;
4523         }
4524
4525         ret = get_generation(mem_ctx, ctdb, &generation);
4526         if (ret != 0) {
4527                 fprintf(stderr, "Failed to get current generation\n");
4528                 return ret;
4529         }
4530
4531         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4532                                      &pnn_list);
4533         if (count <= 0) {
4534                 return ENOMEM;
4535         }
4536
4537         ctdb_req_control_db_freeze(&request, db_id);
4538         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4539                                         ctdb->client, pnn_list, count,
4540                                         TIMEOUT(), &request, NULL, NULL);
4541         if (ret != 0) {
4542                 goto failed;
4543         }
4544
4545         wipedb.db_id = db_id;
4546         wipedb.tid = generation;
4547
4548         ctdb_req_control_db_transaction_start(&request, &wipedb);
4549         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4550                                         pnn_list, count, TIMEOUT(),
4551                                         &request, NULL, NULL);
4552         if (ret != 0) {
4553                 goto failed;
4554         }
4555
4556         ctdb_req_control_wipe_database(&request, &wipedb);
4557         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4558                                         pnn_list, count, TIMEOUT(),
4559                                         &request, NULL, NULL);
4560         if (ret != 0) {
4561                 goto failed;
4562         }
4563
4564         ctdb_req_control_db_set_healthy(&request, db_id);
4565         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4566                                         pnn_list, count, TIMEOUT(),
4567                                         &request, NULL, NULL);
4568         if (ret != 0) {
4569                 goto failed;
4570         }
4571
4572         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4573         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4574                                         pnn_list, count, TIMEOUT(),
4575                                         &request, NULL, NULL);
4576         if (ret != 0) {
4577                 goto failed;
4578         }
4579
4580         ctdb_req_control_db_thaw(&request, db_id);
4581         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4582                                         ctdb->client, pnn_list, count,
4583                                         TIMEOUT(), &request, NULL, NULL);
4584         if (ret != 0) {
4585                 goto failed;
4586         }
4587
4588         printf("Database %s wiped\n", db_name);
4589         return 0;
4590
4591
4592 failed:
4593         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4594                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4595         return ret;
4596 }
4597
4598 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4599                              int argc, const char **argv)
4600 {
4601         uint32_t recmaster;
4602         int ret;
4603
4604         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4605                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4606         if (ret != 0) {
4607                 return ret;
4608         }
4609
4610         printf("%u\n", recmaster);
4611         return 0;
4612 }
4613
4614 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4615                                    const char *event_str)
4616 {
4617         int i;
4618         int num_run = 0;
4619
4620         if (slist == NULL) {
4621                 if (! options.machinereadable) {
4622                         printf("%s cycle never run\n", event_str);
4623                 }
4624                 return;
4625         }
4626
4627         for (i=0; i<slist->num_scripts; i++) {
4628                 if (slist->script[i].status != -ENOEXEC) {
4629                         num_run++;
4630                 }
4631         }
4632
4633         if (! options.machinereadable) {
4634                 printf("%d scripts were executed last %s cycle\n",
4635                        num_run, event_str);
4636         }
4637
4638         for (i=0; i<slist->num_scripts; i++) {
4639                 const char *status = NULL;
4640
4641                 switch (slist->script[i].status) {
4642                 case -ETIME:
4643                         status = "TIMEDOUT";
4644                         break;
4645                 case -ENOEXEC:
4646                         status = "DISABLED";
4647                         break;
4648                 case 0:
4649                         status = "OK";
4650                         break;
4651                 default:
4652                         if (slist->script[i].status > 0) {
4653                                 status = "ERROR";
4654                         }
4655                         break;
4656                 }
4657
4658                 if (options.machinereadable) {
4659                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4660                                options.sep,
4661                                event_str, options.sep,
4662                                slist->script[i].name, options.sep,
4663                                slist->script[i].status, options.sep,
4664                                status, options.sep,
4665                                slist->script[i].start.tv_sec,
4666                                slist->script[i].start.tv_usec, options.sep,
4667                                slist->script[i].finished.tv_sec,
4668                                slist->script[i].finished.tv_usec, options.sep,
4669                                slist->script[i].output, options.sep);
4670                         continue;
4671                 }
4672
4673                 if (status) {
4674                         printf("%-20s Status:%s    ",
4675                                slist->script[i].name, status);
4676                 } else {
4677                         /* Some other error, eg from stat. */
4678                         printf("%-20s Status:CANNOT RUN (%s)",
4679                                slist->script[i].name,
4680                                strerror(-slist->script[i].status));
4681                 }
4682
4683                 if (slist->script[i].status >= 0) {
4684                         printf("Duration:%.3lf ",
4685                                timeval_delta(&slist->script[i].finished,
4686                                              &slist->script[i].start));
4687                 }
4688                 if (slist->script[i].status != -ENOEXEC) {
4689                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4690                         if (slist->script[i].status != 0) {
4691                                 printf("   OUTPUT:%s\n",
4692                                        slist->script[i].output);
4693                         }
4694                 } else {
4695                         printf("\n");
4696                 }
4697         }
4698 }
4699
4700 static void print_scriptstatus(struct ctdb_script_list **slist,
4701                                int count, const char **event_str)
4702 {
4703         int i;
4704
4705         if (options.machinereadable) {
4706                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4707                        options.sep,
4708                        "Type", options.sep,
4709                        "Name", options.sep,
4710                        "Code", options.sep,
4711                        "Status", options.sep,
4712                        "Start", options.sep,
4713                        "End", options.sep,
4714                        "Error Output", options.sep);
4715         }
4716
4717         for (i=0; i<count; i++) {
4718                 print_scriptstatus_one(slist[i], event_str[i]);
4719         }
4720 }
4721
4722 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4723                                 int argc, const char **argv)
4724 {
4725         struct ctdb_script_list **slist;
4726         const char *event_str;
4727         enum ctdb_event event;
4728         const char *all_events[] = {
4729                 "init", "setup", "startup", "monitor",
4730                 "takeip", "releaseip", "updateip", "ipreallocated" };
4731         bool valid;
4732         int ret, i, j;
4733         int count, start, end, num;
4734
4735         if (argc > 1) {
4736                 usage("scriptstatus");
4737         }
4738
4739         if (argc == 0) {
4740                 event_str = "monitor";
4741         } else {
4742                 event_str = argv[0];
4743         }
4744
4745         valid = false;
4746         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4747                 if (strcmp(event_str, all_events[i]) == 0) {
4748                         valid = true;
4749                         count = 1;
4750                         start = i;
4751                         end = i+1;
4752                         break;
4753                 }
4754         }
4755
4756         if (strcmp(event_str, "all") == 0) {
4757                 valid = true;
4758                 count = ARRAY_SIZE(all_events);
4759                 start = 0;
4760                 end = count-1;
4761         }
4762
4763         if (! valid) {
4764                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4765                 usage("scriptstatus");
4766         }
4767
4768         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4769         if (slist == NULL) {
4770                 fprintf(stderr, "Memory allocation error\n");
4771                 return 1;
4772         }
4773
4774         num = 0;
4775         for (i=start; i<end; i++) {
4776                 event = ctdb_event_from_string(all_events[i]);
4777
4778                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4779                                                         ctdb->client,
4780                                                         ctdb->cmd_pnn,
4781                                                         TIMEOUT(), event,
4782                                                         &slist[num]);
4783                 if (ret != 0) {
4784                         fprintf(stderr,
4785                                 "failed to get script status for %s event\n",
4786                                 all_events[i]);
4787                         return 1;
4788                 }
4789
4790                 if (slist[num] == NULL) {
4791                         num++;
4792                         continue;
4793                 }
4794
4795                 /* The ETIME status is ignored for certain events.
4796                  * In that case the status is 0, but endtime is not set.
4797                  */
4798                 for (j=0; j<slist[num]->num_scripts; j++) {
4799                         if (slist[num]->script[j].status == 0 &&
4800                             timeval_is_zero(&slist[num]->script[j].finished)) {
4801                                 slist[num]->script[j].status = -ETIME;
4802                         }
4803                 }
4804
4805                 num++;
4806         }
4807
4808         print_scriptstatus(slist, count, &all_events[start]);
4809         return 0;
4810 }
4811
4812 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4813                                 int argc, const char **argv)
4814 {
4815         int ret;
4816
4817         if (argc != 1) {
4818                 usage("enablescript");
4819         }
4820
4821         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4822                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4823         if (ret != 0) {
4824                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4825                 return ret;
4826         }
4827
4828         return 0;
4829 }
4830
4831 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4832                                  int argc, const char **argv)
4833 {
4834         int ret;
4835
4836         if (argc != 1) {
4837                 usage("disablescript");
4838         }
4839
4840         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4841                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4842         if (ret != 0) {
4843                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4844                 return ret;
4845         }
4846
4847         return 0;
4848 }
4849
4850 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4851                          int argc, const char **argv)
4852 {
4853         char *t, *natgw_helper = NULL;
4854
4855         if (argc != 1) {
4856                 usage("natgw");
4857         }
4858
4859         t = getenv("CTDB_NATGW_HELPER");
4860         if (t != NULL) {
4861                 natgw_helper = talloc_strdup(mem_ctx, t);
4862         } else {
4863                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4864                                                CTDB_HELPER_BINDIR);
4865         }
4866
4867         if (natgw_helper == NULL) {
4868                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4869                 return 1;
4870         }
4871
4872         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4873 }
4874
4875 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4876                              int argc, const char **argv)
4877 {
4878         char *t, *natgw_helper = NULL;
4879
4880         if (argc != 0) {
4881                 usage("natgwlist");
4882         }
4883
4884         t = getenv("CTDB_NATGW_HELPER");
4885         if (t != NULL) {
4886                 natgw_helper = talloc_strdup(mem_ctx, t);
4887         } else {
4888                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4889                                                CTDB_HELPER_BINDIR);
4890         }
4891
4892         if (natgw_helper == NULL) {
4893                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4894                 return 1;
4895         }
4896
4897         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4898 }
4899
4900 /*
4901  * Find the PNN of the current node
4902  * discover the pnn by loading the nodes file and try to bind
4903  * to all addresses one at a time until the ip address is found.
4904  */
4905 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4906 {
4907         struct ctdb_node_map *nodemap;
4908         int i;
4909
4910         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4911         if (nodemap == NULL) {
4912                 return false;
4913         }
4914
4915         for (i=0; i<nodemap->num; i++) {
4916                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4917                         continue;
4918                 }
4919                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4920                         if (pnn != NULL) {
4921                                 *pnn = nodemap->node[i].pnn;
4922                         }
4923                         talloc_free(nodemap);
4924                         return true;
4925                 }
4926         }
4927
4928         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4929         talloc_free(nodemap);
4930         return false;
4931 }
4932
4933 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4934                               int argc, const char **argv)
4935 {
4936         const char *reclock;
4937         int ret;
4938
4939         if (argc != 0) {
4940                 usage("getreclock");
4941         }
4942
4943         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4944                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4945         if (ret != 0) {
4946                 return ret;
4947         }
4948
4949         if (reclock != NULL) {
4950                 printf("%s\n", reclock);
4951         }
4952
4953         return 0;
4954 }
4955
4956 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4957                                   struct ctdb_context *ctdb,
4958                                   int argc, const char **argv)
4959 {
4960         uint32_t lmasterrole = 0;
4961         int ret;
4962
4963         if (argc != 1) {
4964                 usage("setlmasterrole");
4965         }
4966
4967         if (strcmp(argv[0], "on") == 0) {
4968                 lmasterrole = 1;
4969         } else if (strcmp(argv[0], "off") == 0) {
4970                 lmasterrole = 0;
4971         } else {
4972                 usage("setlmasterrole");
4973         }
4974
4975         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4976                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4977         if (ret != 0) {
4978                 return ret;
4979         }
4980
4981         return 0;
4982 }
4983
4984 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4985                                     struct ctdb_context *ctdb,
4986                                     int argc, const char **argv)
4987 {
4988         uint32_t recmasterrole = 0;
4989         int ret;
4990
4991         if (argc != 1) {
4992                 usage("setrecmasterrole");
4993         }
4994
4995         if (strcmp(argv[0], "on") == 0) {
4996                 recmasterrole = 1;
4997         } else if (strcmp(argv[0], "off") == 0) {
4998                 recmasterrole = 0;
4999         } else {
5000                 usage("setrecmasterrole");
5001         }
5002
5003         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
5004                                           ctdb->cmd_pnn, TIMEOUT(),
5005                                           recmasterrole);
5006         if (ret != 0) {
5007                 return ret;
5008         }
5009
5010         return 0;
5011 }
5012
5013 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
5014                                  struct ctdb_context *ctdb,
5015                                  int argc, const char **argv)
5016 {
5017         uint32_t db_id;
5018         uint8_t db_flags;
5019         int ret;
5020
5021         if (argc != 1) {
5022                 usage("setdbreadonly");
5023         }
5024
5025         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5026                 return 1;
5027         }
5028
5029         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5030                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
5031                 return 1;
5032         }
5033
5034         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5035                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5036         if (ret != 0) {
5037                 return ret;
5038         }
5039
5040         return 0;
5041 }
5042
5043 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5044                                int argc, const char **argv)
5045 {
5046         uint32_t db_id;
5047         uint8_t db_flags;
5048         int ret;
5049
5050         if (argc != 1) {
5051                 usage("setdbsticky");
5052         }
5053
5054         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5055                 return 1;
5056         }
5057
5058         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5059                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5060                 return 1;
5061         }
5062
5063         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5064                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5065         if (ret != 0) {
5066                 return ret;
5067         }
5068
5069         return 0;
5070 }
5071
5072 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5073                           int argc, const char **argv)
5074 {
5075         const char *db_name;
5076         struct ctdb_db_context *db;
5077         struct ctdb_transaction_handle *h;
5078         uint8_t db_flags;
5079         TDB_DATA key, data;
5080         int ret;
5081
5082         if (argc < 2 || argc > 3) {
5083                 usage("pfetch");
5084         }
5085
5086         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5087                 return 1;
5088         }
5089
5090         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5091                 fprintf(stderr, "DB %s is not a persistent database\n",
5092                         db_name);
5093                 return 1;
5094         }
5095
5096         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5097                           db_flags, &db);
5098         if (ret != 0) {
5099                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5100                 return ret;
5101         }
5102
5103         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5104         if (ret != 0) {
5105                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5106                 return ret;
5107         }
5108
5109         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5110                                      TIMEOUT(), db, true, &h);
5111         if (ret != 0) {
5112                 fprintf(stderr, "Failed to start transaction on db %s\n",
5113                         db_name);
5114                 return ret;
5115         }
5116
5117         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5118         if (ret != 0) {
5119                 fprintf(stderr, "Failed to read record for key %s\n",
5120                         argv[1]);
5121                 return ret;
5122         }
5123
5124         printf("%.*s\n", (int)data.dsize, data.dptr);
5125
5126         ctdb_transaction_cancel(h);
5127         return 0;
5128 }
5129
5130 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5131                           int argc, const char **argv)
5132 {
5133         const char *db_name;
5134         struct ctdb_db_context *db;
5135         struct ctdb_transaction_handle *h;
5136         uint8_t db_flags;
5137         TDB_DATA key, data;
5138         int ret;
5139
5140         if (argc != 3) {
5141                 usage("pstore");
5142         }
5143
5144         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5145                 return 1;
5146         }
5147
5148         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5149                 fprintf(stderr, "DB %s is not a persistent database\n",
5150                         db_name);
5151                 return 1;
5152         }
5153
5154         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5155                           db_flags, &db);
5156         if (ret != 0) {
5157                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5158                 return ret;
5159         }
5160
5161         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5162         if (ret != 0) {
5163                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5164                 return ret;
5165         }
5166
5167         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5168         if (ret != 0) {
5169                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5170                 return ret;
5171         }
5172
5173         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5174                                      TIMEOUT(), db, false, &h);
5175         if (ret != 0) {
5176                 fprintf(stderr, "Failed to start transaction on db %s\n",
5177                         db_name);
5178                 return ret;
5179         }
5180
5181         ret = ctdb_transaction_store_record(h, key, data);
5182         if (ret != 0) {
5183                 fprintf(stderr, "Failed to store record for key %s\n",
5184                         argv[1]);
5185                 return ret;
5186         }
5187
5188         ret = ctdb_transaction_commit(h);
5189         if (ret != 0) {
5190                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5191                         db_name);
5192                 return ret;
5193         }
5194
5195         return 0;
5196 }
5197
5198 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5199                            int argc, const char **argv)
5200 {
5201         const char *db_name;
5202         struct ctdb_db_context *db;
5203         struct ctdb_transaction_handle *h;
5204         uint8_t db_flags;
5205         TDB_DATA key;
5206         int ret;
5207
5208         if (argc != 2) {
5209                 usage("pdelete");
5210         }
5211
5212         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5213                 return 1;
5214         }
5215
5216         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5217                 fprintf(stderr, "DB %s is not a persistent database\n",
5218                         db_name);
5219                 return 1;
5220         }
5221
5222         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5223                           db_flags, &db);
5224         if (ret != 0) {
5225                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5226                 return ret;
5227         }
5228
5229         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5230         if (ret != 0) {
5231                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5232                 return ret;
5233         }
5234
5235         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5236                                      TIMEOUT(), db, false, &h);
5237         if (ret != 0) {
5238                 fprintf(stderr, "Failed to start transaction on db %s\n",
5239                         db_name);
5240                 return ret;
5241         }
5242
5243         ret = ctdb_transaction_delete_record(h, key);
5244         if (ret != 0) {
5245                 fprintf(stderr, "Failed to delete record for key %s\n",
5246                         argv[1]);
5247                 return ret;
5248         }
5249
5250         ret = ctdb_transaction_commit(h);
5251         if (ret != 0) {
5252                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5253                         db_name);
5254                 return ret;
5255         }
5256
5257         return 0;
5258 }
5259
5260 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5261 {
5262         const char *t;
5263         size_t n;
5264         int ret;
5265
5266         *data = tdb_null;
5267
5268         /* Skip whitespace */
5269         n = strspn(*ptr, " \t");
5270         t = *ptr + n;
5271
5272         if (t[0] == '"') {
5273                 /* Quoted ASCII string - no wide characters! */
5274                 t++;
5275                 n = strcspn(t, "\"");
5276                 if (t[n] == '"') {
5277                         if (n > 0) {
5278                                 ret = str_to_data(t, n, mem_ctx, data);
5279                                 if (ret != 0) {
5280                                         return ret;
5281                                 }
5282                         }
5283                         *ptr = t + n + 1;
5284                 } else {
5285                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5286                         return 1;
5287                 }
5288         } else {
5289                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5290                 return 1;
5291         }
5292
5293         return 0;
5294 }
5295
5296 #define MAX_LINE_SIZE   1024
5297
5298 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5299                                  TDB_DATA *key, TDB_DATA *value)
5300 {
5301         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5302         const char *ptr;
5303         int ret;
5304
5305         ptr = fgets(line, MAX_LINE_SIZE, file);
5306         if (ptr == NULL) {
5307                 return false;
5308         }
5309
5310         /* Get key */
5311         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5312         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5313                 /* Line Ignored but not EOF */
5314                 *key = tdb_null;
5315                 return true;
5316         }
5317
5318         /* Get value */
5319         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5320         if (ret != 0) {
5321                 /* Line Ignored but not EOF */
5322                 talloc_free(key->dptr);
5323                 *key = tdb_null;
5324                 return true;
5325         }
5326
5327         return true;
5328 }
5329
5330 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5331                           int argc, const char **argv)
5332 {
5333         const char *db_name;
5334         struct ctdb_db_context *db;
5335         struct ctdb_transaction_handle *h;
5336         uint8_t db_flags;
5337         FILE *file;
5338         TDB_DATA key, value;
5339         int ret;
5340
5341         if (argc < 1 || argc > 2) {
5342                 usage("ptrans");
5343         }
5344
5345         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5346                 return 1;
5347         }
5348
5349         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5350                 fprintf(stderr, "DB %s is not a persistent database\n",
5351                         db_name);
5352                 return 1;
5353         }
5354
5355         if (argc == 2) {
5356                 file = fopen(argv[1], "r");
5357                 if (file == NULL) {
5358                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5359                         return 1;
5360                 }
5361         } else {
5362                 file = stdin;
5363         }
5364
5365         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5366                           db_flags, &db);
5367         if (ret != 0) {
5368                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5369                 goto done;
5370         }
5371
5372         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5373                                      TIMEOUT(), db, false, &h);
5374         if (ret != 0) {
5375                 fprintf(stderr, "Failed to start transaction on db %s\n",
5376                         db_name);
5377                 goto done;
5378         }
5379
5380         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5381                 if (key.dsize != 0) {
5382                         ret = ctdb_transaction_store_record(h, key, value);
5383                         if (ret != 0) {
5384                                 fprintf(stderr, "Failed to store record\n");
5385                                 ctdb_transaction_cancel(h);
5386                                 goto done;
5387                         }
5388                         talloc_free(key.dptr);
5389                         talloc_free(value.dptr);
5390                 }
5391         }
5392
5393         ret = ctdb_transaction_commit(h);
5394         if (ret != 0) {
5395                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5396                         db_name);
5397         }
5398
5399 done:
5400         if (file != stdin) {
5401                 fclose(file);
5402         }
5403         return ret;
5404 }
5405
5406 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5407                           int argc, const char **argv)
5408 {
5409         struct tdb_context *tdb;
5410         TDB_DATA key, data;
5411         struct ctdb_ltdb_header header;
5412         int ret;
5413
5414         if (argc < 2 || argc > 3) {
5415                 usage("tfetch");
5416         }
5417
5418         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5419         if (tdb == NULL) {
5420                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5421                 return 1;
5422         }
5423
5424         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5425         if (ret != 0) {
5426                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5427                 return ret;
5428         }
5429
5430         data = tdb_fetch(tdb, key);
5431         if (data.dptr == NULL) {
5432                 fprintf(stderr, "No record for key %s\n", argv[1]);
5433                 return 1;
5434         }
5435
5436         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5437                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5438                 return 1;
5439         }
5440
5441         tdb_close(tdb);
5442
5443         if (argc == 3) {
5444                 int fd;
5445                 ssize_t nwritten;
5446
5447                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5448                 if (fd == -1) {
5449                         fprintf(stderr, "Failed to open output file %s\n",
5450                                 argv[2]);
5451                         goto fail;
5452                 }
5453
5454                 nwritten = sys_write(fd, data.dptr, data.dsize);
5455                 if (nwritten != data.dsize) {
5456                         fprintf(stderr, "Failed to write record to file\n");
5457                         close(fd);
5458                         goto fail;
5459                 }
5460
5461                 close(fd);
5462         }
5463
5464 fail:
5465         ret = ctdb_ltdb_header_extract(&data, &header);
5466         if (ret != 0) {
5467                 fprintf(stderr, "Failed to parse header from data\n");
5468                 return 1;
5469         }
5470
5471         dump_ltdb_header(&header);
5472         dump_tdb_data("data", data);
5473
5474         return 0;
5475 }
5476
5477 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5478                           int argc, const char **argv)
5479 {
5480         struct tdb_context *tdb;
5481         TDB_DATA key, data, value;
5482         struct ctdb_ltdb_header header;
5483         size_t offset;
5484         int ret;
5485
5486         if (argc < 3 || argc > 5) {
5487                 usage("tstore");
5488         }
5489
5490         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5491         if (tdb == NULL) {
5492                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5493                 return 1;
5494         }
5495
5496         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5497         if (ret != 0) {
5498                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5499                 return ret;
5500         }
5501
5502         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5503         if (ret != 0) {
5504                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5505                 return ret;
5506         }
5507
5508         ZERO_STRUCT(header);
5509
5510         if (argc > 3) {
5511                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5512         }
5513         if (argc > 4) {
5514                 header.dmaster = (uint32_t)atol(argv[4]);
5515         }
5516         if (argc > 5) {
5517                 header.flags = (uint32_t)atol(argv[5]);
5518         }
5519
5520         offset = ctdb_ltdb_header_len(&header);
5521         data.dsize = offset + value.dsize;
5522         data.dptr = talloc_size(mem_ctx, data.dsize);
5523         if (data.dptr == NULL) {
5524                 fprintf(stderr, "Memory allocation error\n");
5525                 return 1;
5526         }
5527
5528         ctdb_ltdb_header_push(&header, data.dptr);
5529         memcpy(data.dptr + offset, value.dptr, value.dsize);
5530
5531         ret = tdb_store(tdb, key, data, TDB_REPLACE);
5532         if (ret != 0) {
5533                 fprintf(stderr, "Failed to write record %s to file %s\n",
5534                         argv[1], argv[0]);
5535         }
5536
5537         tdb_close(tdb);
5538
5539         return ret;
5540 }
5541
5542 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5543                            int argc, const char **argv)
5544 {
5545         const char *db_name;
5546         struct ctdb_db_context *db;
5547         struct ctdb_record_handle *h;
5548         uint8_t db_flags;
5549         TDB_DATA key, data;
5550         bool readonly = false;
5551         int ret;
5552
5553         if (argc < 2 || argc > 3) {
5554                 usage("readkey");
5555         }
5556
5557         if (argc == 3) {
5558                 if (strcmp(argv[2], "readonly") == 0) {
5559                         readonly = true;
5560                 } else {
5561                         usage("readkey");
5562                 }
5563         }
5564
5565         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5566                 return 1;
5567         }
5568
5569         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5570                 fprintf(stderr, "DB %s is not a volatile database\n",
5571                         db_name);
5572                 return 1;
5573         }
5574
5575         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5576                           db_flags, &db);
5577         if (ret != 0) {
5578                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5579                 return ret;
5580         }
5581
5582         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5583         if (ret != 0) {
5584                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5585                 return ret;
5586         }
5587
5588         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5589                               db, key, readonly, &h, NULL, &data);
5590         if (ret != 0) {
5591                 fprintf(stderr, "Failed to read record for key %s\n",
5592                         argv[1]);
5593         } else {
5594                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5595                        (int)data.dsize, data.dptr);
5596         }
5597
5598         talloc_free(h);
5599         return ret;
5600 }
5601
5602 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5603                             int argc, const char **argv)
5604 {
5605         const char *db_name;
5606         struct ctdb_db_context *db;
5607         struct ctdb_record_handle *h;
5608         uint8_t db_flags;
5609         TDB_DATA key, data;
5610         int ret;
5611
5612         if (argc != 3) {
5613                 usage("writekey");
5614         }
5615
5616         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5617                 return 1;
5618         }
5619
5620         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5621                 fprintf(stderr, "DB %s is not a volatile database\n",
5622                         db_name);
5623                 return 1;
5624         }
5625
5626         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5627                           db_flags, &db);
5628         if (ret != 0) {
5629                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5630                 return ret;
5631         }
5632
5633         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5634         if (ret != 0) {
5635                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5636                 return ret;
5637         }
5638
5639         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5640         if (ret != 0) {
5641                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5642                 return ret;
5643         }
5644
5645         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5646                               db, key, false, &h, NULL, NULL);
5647         if (ret != 0) {
5648                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5649                 return ret;
5650         }
5651
5652         ret = ctdb_store_record(h, data);
5653         if (ret != 0) {
5654                 fprintf(stderr, "Failed to store record for key %s\n",
5655                         argv[1]);
5656         }
5657
5658         talloc_free(h);
5659         return ret;
5660 }
5661
5662 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5663                              int argc, const char **argv)
5664 {
5665         const char *db_name;
5666         struct ctdb_db_context *db;
5667         struct ctdb_record_handle *h;
5668         uint8_t db_flags;
5669         TDB_DATA key, data;
5670         int ret;
5671
5672         if (argc != 2) {
5673                 usage("deletekey");
5674         }
5675
5676         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5677                 return 1;
5678         }
5679
5680         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5681                 fprintf(stderr, "DB %s is not a volatile database\n",
5682                         db_name);
5683                 return 1;
5684         }
5685
5686         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5687                           db_flags, &db);
5688         if (ret != 0) {
5689                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5690                 return ret;
5691         }
5692
5693         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5694         if (ret != 0) {
5695                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5696                 return ret;
5697         }
5698
5699         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5700                               db, key, false, &h, NULL, &data);
5701         if (ret != 0) {
5702                 fprintf(stderr, "Failed to fetch record for key %s\n",
5703                         argv[1]);
5704                 return ret;
5705         }
5706
5707         ret = ctdb_delete_record(h);
5708         if (ret != 0) {
5709                 fprintf(stderr, "Failed to delete record for key %s\n",
5710                         argv[1]);
5711         }
5712
5713         talloc_free(h);
5714         return ret;
5715 }
5716
5717 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5718                                 int argc, const char **argv)
5719 {
5720         struct sockaddr_in sin;
5721         unsigned int port;
5722         int s, v;
5723         int ret;
5724
5725         if (argc != 1) {
5726                 usage("chktcpport");
5727         }
5728
5729         port = atoi(argv[0]);
5730
5731         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5732         if (s == -1) {
5733                 fprintf(stderr, "Failed to open local socket\n");
5734                 return errno;
5735         }
5736
5737         v = fcntl(s, F_GETFL, 0);
5738         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5739                 fprintf(stderr, "Unable to set socket non-blocking\n");
5740                 close(s);
5741                 return errno;
5742         }
5743
5744         bzero(&sin, sizeof(sin));
5745         sin.sin_family = AF_INET;
5746         sin.sin_port = htons(port);
5747         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5748         close(s);
5749         if (ret == -1) {
5750                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5751                 return errno;
5752         }
5753
5754         return 0;
5755 }
5756
5757 static int control_rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5758                                  int argc, const char **argv)
5759 {
5760         if (argc != 0) {
5761                 usage("rebalancenode");
5762         }
5763
5764         if (! rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn)) {
5765                 fprintf(stderr, "Failed to rebalance IPs on node %u\n",
5766                         ctdb->cmd_pnn);
5767                 return 1;
5768         }
5769
5770         return 0;
5771 }
5772
5773 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5774                                int argc, const char **argv)
5775 {
5776         uint32_t db_id;
5777         const char *db_name;
5778         uint64_t seqnum;
5779         int ret;
5780
5781         if (argc != 1) {
5782                 usage("getdbseqnum");
5783         }
5784
5785         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5786                 return 1;
5787         }
5788
5789         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5790                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5791                                       &seqnum);
5792         if (ret != 0) {
5793                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5794                         db_name);
5795                 return ret;
5796         }
5797
5798         printf("0x%"PRIx64"\n", seqnum);
5799         return 0;
5800 }
5801
5802 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5803                               int argc, const char **argv)
5804 {
5805         const char *nodestring = NULL;
5806         struct ctdb_node_map *nodemap;
5807         int ret, i;
5808
5809         if (argc > 1) {
5810                 usage("nodestatus");
5811         }
5812
5813         if (argc == 1) {
5814                 nodestring = argv[0];
5815         }
5816
5817         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5818                 return 1;
5819         }
5820
5821         nodemap = get_nodemap(ctdb, false);
5822         if (nodemap == NULL) {
5823                 return 1;
5824         }
5825
5826         if (options.machinereadable) {
5827                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5828         } else {
5829                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5830         }
5831
5832         ret = 0;
5833         for (i=0; i<nodemap->num; i++) {
5834                 ret |= nodemap->node[i].flags;
5835         }
5836
5837         return ret;
5838 }
5839
5840 const struct {
5841         const char *name;
5842         uint32_t offset;
5843 } db_stats_fields[] = {
5844 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5845         DBSTATISTICS_FIELD(db_ro_delegations),
5846         DBSTATISTICS_FIELD(db_ro_revokes),
5847         DBSTATISTICS_FIELD(locks.num_calls),
5848         DBSTATISTICS_FIELD(locks.num_current),
5849         DBSTATISTICS_FIELD(locks.num_pending),
5850         DBSTATISTICS_FIELD(locks.num_failed),
5851         DBSTATISTICS_FIELD(db_ro_delegations),
5852 };
5853
5854 static void print_dbstatistics(const char *db_name,
5855                                struct ctdb_db_statistics *s)
5856 {
5857         int i;
5858         const char *prefix = NULL;
5859         int preflen = 0;
5860
5861         printf("DB Statistics %s\n", db_name);
5862
5863         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5864                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5865                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5866                         if (! prefix ||
5867                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5868                                 prefix = db_stats_fields[i].name;
5869                                 printf(" %*.*s\n", preflen-1, preflen-1,
5870                                        db_stats_fields[i].name);
5871                         }
5872                 } else {
5873                         preflen = 0;
5874                 }
5875                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5876                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5877                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5878         }
5879
5880         printf(" hop_count_buckets:");
5881         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5882                 printf(" %d", s->hop_count_bucket[i]);
5883         }
5884         printf("\n");
5885
5886         printf(" lock_buckets:");
5887         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5888                 printf(" %d", s->locks.buckets[i]);
5889         }
5890         printf("\n");
5891
5892         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5893                "locks_latency      MIN/AVG/MAX",
5894                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5895                s->locks.latency.max, s->locks.latency.num);
5896
5897         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5898                "vacuum_latency     MIN/AVG/MAX",
5899                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5900                s->vacuum.latency.max, s->vacuum.latency.num);
5901
5902         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5903         for (i=0; i<s->num_hot_keys; i++) {
5904                 int j;
5905                 printf("     Count:%d Key:", s->hot_keys[i].count);
5906                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5907                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5908                 }
5909                 printf("\n");
5910         }
5911 }
5912
5913 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5914                                 int argc, const char **argv)
5915 {
5916         uint32_t db_id;
5917         const char *db_name;
5918         struct ctdb_db_statistics *dbstats;
5919         int ret;
5920
5921         if (argc != 1) {
5922                 usage("dbstatistics");
5923         }
5924
5925         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5926                 return 1;
5927         }
5928
5929         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5930                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5931                                           &dbstats);
5932         if (ret != 0) {
5933                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5934                         db_name);
5935                 return ret;
5936         }
5937
5938         print_dbstatistics(db_name, dbstats);
5939         return 0;
5940 }
5941
5942 struct disable_takeover_runs_state {
5943         uint32_t *pnn_list;
5944         int node_count;
5945         bool *reply;
5946         int status;
5947         bool done;
5948 };
5949
5950 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5951                                          void *private_data)
5952 {
5953         struct disable_takeover_runs_state *state =
5954                 (struct disable_takeover_runs_state *)private_data;
5955         int ret, i;
5956
5957         if (data.dsize != sizeof(int)) {
5958                 /* Ignore packet */
5959                 return;
5960         }
5961
5962         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5963         ret = *(int *)data.dptr;
5964         if (ret < 0) {
5965                 state->status = ret;
5966                 state->done = true;
5967                 return;
5968         }
5969         for (i=0; i<state->node_count; i++) {
5970                 if (state->pnn_list[i] == ret) {
5971                         state->reply[i] = true;
5972                         break;
5973                 }
5974         }
5975
5976         state->done = true;
5977         for (i=0; i<state->node_count; i++) {
5978                 if (! state->reply[i]) {
5979                         state->done = false;
5980                         break;
5981                 }
5982         }
5983 }
5984
5985 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5986                                  struct ctdb_context *ctdb, uint32_t timeout,
5987                                  uint32_t *pnn_list, int count)
5988 {
5989         struct ctdb_disable_message disable = { 0 };
5990         struct disable_takeover_runs_state state;
5991         int ret, i;
5992
5993         disable.pnn = ctdb->pnn;
5994         disable.srvid = next_srvid(ctdb);
5995         disable.timeout = timeout;
5996
5997         state.pnn_list = pnn_list;
5998         state.node_count = count;
5999         state.done = false;
6000         state.status = 0;
6001         state.reply = talloc_zero_array(mem_ctx, bool, count);
6002         if (state.reply == NULL) {
6003                 return ENOMEM;
6004         }
6005
6006         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
6007                                               disable.srvid,
6008                                               disable_takeover_run_handler,
6009                                               &state);
6010         if (ret != 0) {
6011                 return ret;
6012         }
6013
6014         for (i=0; i<count; i++) {
6015                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
6016                                                          ctdb->client,
6017                                                          pnn_list[i],
6018                                                          &disable);
6019                 if (ret != 0) {
6020                         goto fail;
6021                 }
6022         }
6023
6024         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
6025         if (ret == ETIME) {
6026                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
6027         } else {
6028                 ret = (state.status >= 0 ? 0 : 1);
6029         }
6030
6031 fail:
6032         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
6033                                            disable.srvid, &state);
6034         return ret;
6035 }
6036
6037 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6038                              int argc, const char **argv)
6039 {
6040         const char *nodestring = NULL;
6041         struct ctdb_node_map *nodemap, *nodemap2;
6042         struct ctdb_req_control request;
6043         uint32_t *pnn_list, *pnn_list2;
6044         int ret, count, count2;
6045
6046         if (argc > 1) {
6047                 usage("reloadips");
6048         }
6049
6050         if (argc == 1) {
6051                 nodestring = argv[0];
6052         }
6053
6054         nodemap = get_nodemap(ctdb, false);
6055         if (nodemap == NULL) {
6056                 return 1;
6057         }
6058
6059         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6060                 return 1;
6061         }
6062
6063         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6064                                         mem_ctx, &pnn_list);
6065         if (count <= 0) {
6066                 fprintf(stderr, "Memory allocation error\n");
6067                 return 1;
6068         }
6069
6070         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6071                                       mem_ctx, &pnn_list2);
6072         if (count2 <= 0) {
6073                 fprintf(stderr, "Memory allocation error\n");
6074                 return 1;
6075         }
6076
6077         /* Disable takeover runs on all connected nodes.  A reply
6078          * indicating success is needed from each node so all nodes
6079          * will need to be active.
6080          *
6081          * A check could be added to not allow reloading of IPs when
6082          * there are disconnected nodes.  However, this should
6083          * probably be left up to the administrator.
6084          */
6085         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6086                                     pnn_list, count);
6087         if (ret != 0) {
6088                 fprintf(stderr, "Failed to disable takeover runs\n");
6089                 return ret;
6090         }
6091
6092         /* Now tell all the desired nodes to reload their public IPs.
6093          * Keep trying this until it succeeds.  This assumes all
6094          * failures are transient, which might not be true...
6095          */
6096         ctdb_req_control_reload_public_ips(&request);
6097         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6098                                         pnn_list2, count2, TIMEOUT(),
6099                                         &request, NULL, NULL);
6100         if (ret != 0) {
6101                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6102         }
6103
6104         /* It isn't strictly necessary to wait until takeover runs are
6105          * re-enabled but doing so can't hurt.
6106          */
6107         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6108         if (ret != 0) {
6109                 fprintf(stderr, "Failed to enable takeover runs\n");
6110                 return ret;
6111         }
6112
6113         return ipreallocate(mem_ctx, ctdb);
6114 }
6115
6116 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6117                            int argc, const char **argv)
6118 {
6119         ctdb_sock_addr addr;
6120         char *iface;
6121
6122         if (argc != 1) {
6123                 usage("ipiface");
6124         }
6125
6126         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6127                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6128                 return 1;
6129         }
6130
6131         iface = ctdb_sys_find_ifname(&addr);
6132         if (iface == NULL) {
6133                 fprintf(stderr, "Failed to find interface for IP %s\n",
6134                         argv[0]);
6135                 return 1;
6136         }
6137         free(iface);
6138
6139         return 0;
6140 }
6141
6142
6143 static const struct ctdb_cmd {
6144         const char *name;
6145         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6146         bool without_daemon; /* can be run without daemon running ? */
6147         bool remote; /* can be run on remote nodes */
6148         const char *msg;
6149         const char *args;
6150 } ctdb_commands[] = {
6151         { "version", control_version, true, false,
6152                 "show version of ctdb", NULL },
6153         { "status", control_status, false, true,
6154                 "show node status", NULL },
6155         { "uptime", control_uptime, false, true,
6156                 "show node uptime", NULL },
6157         { "ping", control_ping, false, true,
6158                 "ping all nodes", NULL },
6159         { "runstate", control_runstate, false, true,
6160                 "get/check runstate of a node",
6161                 "[setup|first_recovery|startup|running]" },
6162         { "getvar", control_getvar, false, true,
6163                 "get a tunable variable", "<name>" },
6164         { "setvar", control_setvar, false, true,
6165                 "set a tunable variable", "<name> <value>" },
6166         { "listvars", control_listvars, false, true,
6167                 "list tunable variables", NULL },
6168         { "statistics", control_statistics, false, true,
6169                 "show ctdb statistics", NULL },
6170         { "statisticsreset", control_statistics_reset, false, true,
6171                 "reset ctdb statistics", NULL },
6172         { "stats", control_stats, false, true,
6173                 "show rolling statistics", "[count]" },
6174         { "ip", control_ip, false, true,
6175                 "show public ips", "[all]" },
6176         { "ipinfo", control_ipinfo, false, true,
6177                 "show public ip details", "<ip>" },
6178         { "ifaces", control_ifaces, false, true,
6179                 "show interfaces", NULL },
6180         { "setifacelink", control_setifacelink, false, true,
6181                 "set interface link status", "<iface> up|down" },
6182         { "process-exists", control_process_exists, false, true,
6183                 "check if a process exists on a node",  "<pid>" },
6184         { "getdbmap", control_getdbmap, false, true,
6185                 "show attached databases", NULL },
6186         { "getdbstatus", control_getdbstatus, false, true,
6187                 "show database status", "<dbname|dbid>" },
6188         { "catdb", control_catdb, false, false,
6189                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6190         { "cattdb", control_cattdb, false, false,
6191                 "dump local ctdb database", "<dbname|dbid>" },
6192         { "getmonmode", control_getmonmode, false, true,
6193                 "show monitoring mode", NULL },
6194         { "getcapabilities", control_getcapabilities, false, true,
6195                 "show node capabilities", NULL },
6196         { "pnn", control_pnn, false, false,
6197                 "show the pnn of the currnet node", NULL },
6198         { "lvs", control_lvs, false, false,
6199                 "show lvs configuration", "master|list|status" },
6200         { "disablemonitor", control_disable_monitor, false, true,
6201                 "disable monitoring", NULL },
6202         { "enablemonitor", control_enable_monitor, false, true,
6203                 "enable monitoring", NULL },
6204         { "setdebug", control_setdebug, false, true,
6205                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6206         { "getdebug", control_getdebug, false, true,
6207                 "get debug level", NULL },
6208         { "attach", control_attach, false, false,
6209                 "attach a database", "<dbname> [persistent]" },
6210         { "detach", control_detach, false, false,
6211                 "detach database(s)", "<dbname|dbid> ..." },
6212         { "dumpmemory", control_dumpmemory, false, true,
6213                 "dump ctdbd memory map", NULL },
6214         { "rddumpmemory", control_rddumpmemory, false, true,
6215                 "dump recoverd memory map", NULL },
6216         { "getpid", control_getpid, false, true,
6217                 "get ctdbd process ID", NULL },
6218         { "disable", control_disable, false, true,
6219                 "disable a node", NULL },
6220         { "enable", control_enable, false, true,
6221                 "enable a node", NULL },
6222         { "stop", control_stop, false, true,
6223                 "stop a node", NULL },
6224         { "continue", control_continue, false, true,
6225                 "continue a stopped node", NULL },
6226         { "ban", control_ban, false, true,
6227                 "ban a node", "<bantime>"},
6228         { "unban", control_unban, false, true,
6229                 "unban a node", NULL },
6230         { "shutdown", control_shutdown, false, true,
6231                 "shutdown ctdb daemon", NULL },
6232         { "recover", control_recover, false, true,
6233                 "force recovery", NULL },
6234         { "sync", control_ipreallocate, false, true,
6235                 "run ip reallocation (deprecated)", NULL },
6236         { "ipreallocate", control_ipreallocate, false, true,
6237                 "run ip reallocation", NULL },
6238         { "isnotrecmaster", control_isnotrecmaster, false, false,
6239                 "check if local node is the recmaster", NULL },
6240         { "gratarp", control_gratarp, false, true,
6241                 "send a gratuitous arp", "<ip> <interface>" },
6242         { "tickle", control_tickle, false, false,
6243                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6244         { "gettickles", control_gettickles, false, true,
6245                 "get the list of tickles", "<ip> [<port>]" },
6246         { "addtickle", control_addtickle, false, true,
6247                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6248         { "deltickle", control_deltickle, false, true,
6249                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6250         { "check_srvids", control_check_srvids, false, true,
6251                 "check if srvid is registered", "<id> [<id> ...]" },
6252         { "listnodes", control_listnodes, true, true,
6253                 "list nodes in the cluster", NULL },
6254         { "reloadnodes", control_reloadnodes, false, false,
6255                 "reload the nodes file all nodes", NULL },
6256         { "moveip", control_moveip, false, false,
6257                 "move an ip address to another node", "<ip> <node>" },
6258         { "rebalanceip", control_rebalanceip, false, false,
6259                 "move an ip address optimally to another node", "<ip>" },
6260         { "addip", control_addip, false, true,
6261                 "add an ip address to a node", "<ip/mask> <iface>" },
6262         { "delip", control_delip, false, true,
6263                 "delete an ip address from a node", "<ip>" },
6264         { "eventscript", control_eventscript, false, true,
6265                 "run an event", "monitor" },
6266         { "backupdb", control_backupdb, false, false,
6267                 "backup a database into a file", "<dbname|dbid> <file>" },
6268         { "restoredb", control_restoredb, false, false,
6269                 "restore a database from a file", "<file> [dbname]" },
6270         { "dumpdbbackup", control_dumpdbbackup, true, false,
6271                 "dump database from a backup file", "<file>" },
6272         { "wipedb", control_wipedb, false, false,
6273                 "wipe the contents of a database.", "<dbname|dbid>"},
6274         { "recmaster", control_recmaster, false, true,
6275                 "show the pnn for the recovery master", NULL },
6276         { "scriptstatus", control_scriptstatus, false, true,
6277                 "show event script status",
6278                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6279         { "enablescript", control_enablescript, false, true,
6280                 "enable an eventscript", "<script>"},
6281         { "disablescript", control_disablescript, false, true,
6282                 "disable an eventscript", "<script>"},
6283         { "natgw", control_natgw, false, false,
6284                 "show natgw configuration", "master|list|status" },
6285         { "natgwlist", control_natgwlist, false, false,
6286                 "show the nodes belonging to this natgw configuration", NULL },
6287         { "getreclock", control_getreclock, false, true,
6288                 "get recovery lock file", NULL },
6289         { "setlmasterrole", control_setlmasterrole, false, true,
6290                 "set LMASTER role", "on|off" },
6291         { "setrecmasterrole", control_setrecmasterrole, false, true,
6292                 "set RECMASTER role", "on|off"},
6293         { "setdbreadonly", control_setdbreadonly, false, true,
6294                 "enable readonly records", "<dbname|dbid>" },
6295         { "setdbsticky", control_setdbsticky, false, true,
6296                 "enable sticky records", "<dbname|dbid>"},
6297         { "pfetch", control_pfetch, false, false,
6298                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6299         { "pstore", control_pstore, false, false,
6300                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6301         { "pdelete", control_pdelete, false, false,
6302                 "delete record from persistent database", "<dbname|dbid> <key>" },
6303         { "ptrans", control_ptrans, false, false,
6304                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6305         { "tfetch", control_tfetch, false, true,
6306                 "fetch a record", "<tdb-file> <key> [<file>]" },
6307         { "tstore", control_tstore, false, true,
6308                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6309         { "readkey", control_readkey, false, false,
6310                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6311         { "writekey", control_writekey, false, false,
6312                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6313         { "deletekey", control_deletekey, false, false,
6314                 "delete a database key", "<dbname|dbid> <key>" },
6315         { "checktcpport", control_checktcpport, true, false,
6316                 "check if a service is bound to a specific tcp port or not", "<port>" },
6317         { "rebalancenode", control_rebalancenode, false, true,
6318                 "mark nodes as forced IP rebalancing targets", NULL },
6319         { "getdbseqnum", control_getdbseqnum, false, false,
6320                 "get database sequence number", "<dbname|dbid>" },
6321         { "nodestatus", control_nodestatus, false, true,
6322                 "show and return node status", "[all|<pnn-list>]" },
6323         { "dbstatistics", control_dbstatistics, false, true,
6324                 "show database statistics", "<dbname|dbid>" },
6325         { "reloadips", control_reloadips, false, false,
6326                 "reload the public addresses file", "[all|<pnn-list>]" },
6327         { "ipiface", control_ipiface, true, false,
6328                 "Find the interface an ip address is hosted on", "<ip>" },
6329 };
6330
6331 static const struct ctdb_cmd *match_command(const char *command)
6332 {
6333         const struct ctdb_cmd *cmd;
6334         int i;
6335
6336         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6337                 cmd = &ctdb_commands[i];
6338                 if (strlen(command) == strlen(cmd->name) &&
6339                     strncmp(command, cmd->name, strlen(command)) == 0) {
6340                         return cmd;
6341                 }
6342         }
6343
6344         return NULL;
6345 }
6346
6347
6348 /**
6349  * Show usage message
6350  */
6351 static void usage_full(void)
6352 {
6353         int i;
6354
6355         poptPrintHelp(pc, stdout, 0);
6356         printf("\nCommands:\n");
6357         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6358                 printf("  %-15s %-27s  %s\n",
6359                        ctdb_commands[i].name,
6360                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6361                        ctdb_commands[i].msg);
6362         }
6363 }
6364
6365 static void usage(const char *command)
6366 {
6367         const struct ctdb_cmd *cmd;
6368
6369         if (command == NULL) {
6370                 usage_full();
6371                 exit(1);
6372         }
6373
6374         cmd = match_command(command);
6375         if (cmd == NULL) {
6376                 usage_full();
6377         } else {
6378                 poptPrintUsage(pc, stdout, 0);
6379                 printf("\nCommands:\n");
6380                 printf("  %-15s %-27s  %s\n",
6381                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6382         }
6383
6384         exit(1);
6385 }
6386
6387 struct poptOption cmdline_options[] = {
6388         POPT_AUTOHELP
6389         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6390                 "CTDB socket path", "filename" },
6391         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6392                 "debug level"},
6393         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6394                 "timelimit (in seconds)" },
6395         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6396                 "node specification - integer" },
6397         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6398                 "enable machine readable output", NULL },
6399         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6400                 "specify separator for machine readable output", "CHAR" },
6401         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6402                 "enable machine parsable output with separator |", NULL },
6403         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6404                 "enable verbose output", NULL },
6405         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6406                 "die if runtime exceeds this limit (in seconds)" },
6407         POPT_TABLEEND
6408 };
6409
6410 static int process_command(const struct ctdb_cmd *cmd, int argc,
6411                            const char **argv)
6412 {
6413         TALLOC_CTX *tmp_ctx;
6414         struct ctdb_context *ctdb;
6415         int ret;
6416         bool status;
6417         uint64_t srvid_offset;
6418
6419         tmp_ctx = talloc_new(NULL);
6420         if (tmp_ctx == NULL) {
6421                 fprintf(stderr, "Memory allocation error\n");
6422                 goto fail;
6423         }
6424
6425         if (cmd->without_daemon) {
6426                 if (options.pnn != -1) {
6427                         fprintf(stderr,
6428                                 "Cannot specify node for command %s\n",
6429                                 cmd->name);
6430                         goto fail;
6431                 }
6432
6433                 return cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6434         }
6435
6436         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6437         if (ctdb == NULL) {
6438                 fprintf(stderr, "Memory allocation error\n");
6439                 goto fail;
6440         }
6441
6442         ctdb->ev = tevent_context_init(ctdb);
6443         if (ctdb->ev == NULL) {
6444                 fprintf(stderr, "Failed to initialize tevent\n");
6445                 goto fail;
6446         }
6447
6448         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6449         if (ret != 0) {
6450                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6451                         options.socket);
6452
6453                 if (!find_node_xpnn(ctdb, NULL)) {
6454                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6455                 }
6456                 goto fail;
6457         }
6458
6459         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6460         srvid_offset = getpid() & 0xFFFF;
6461         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6462
6463         if (options.pnn != -1) {
6464                 status = verify_pnn(ctdb, options.pnn);
6465                 if (! status) {
6466                         goto fail;
6467                 }
6468
6469                 ctdb->cmd_pnn = options.pnn;
6470         } else {
6471                 ctdb->cmd_pnn = ctdb->pnn;
6472         }
6473
6474         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6475                 fprintf(stderr, "Node cannot be specified for command %s\n",
6476                         cmd->name);
6477                 goto fail;
6478         }
6479
6480         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6481         talloc_free(tmp_ctx);
6482         return ret;
6483
6484 fail:
6485         talloc_free(tmp_ctx);
6486         return 1;
6487 }
6488
6489 static void signal_handler(int sig)
6490 {
6491         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6492 }
6493
6494 static void alarm_handler(int sig)
6495 {
6496         /* Kill any child processes */
6497         signal(SIGTERM, signal_handler);
6498         kill(0, SIGTERM);
6499
6500         _exit(1);
6501 }
6502
6503 int main(int argc, const char *argv[])
6504 {
6505         int opt;
6506         const char **extra_argv;
6507         int extra_argc;
6508         const struct ctdb_cmd *cmd;
6509         const char *ctdb_socket;
6510         enum debug_level loglevel;
6511         int ret;
6512
6513         setlinebuf(stdout);
6514
6515         /* Set default options */
6516         options.socket = CTDB_SOCKET;
6517         options.debuglevelstr = NULL;
6518         options.timelimit = 10;
6519         options.sep = "|";
6520         options.maxruntime = 0;
6521         options.pnn = -1;
6522
6523         ctdb_socket = getenv("CTDB_SOCKET");
6524         if (ctdb_socket != NULL) {
6525                 options.socket = ctdb_socket;
6526         }
6527
6528         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6529                             POPT_CONTEXT_KEEP_FIRST);
6530         while ((opt = poptGetNextOpt(pc)) != -1) {
6531                 fprintf(stderr, "Invalid option %s: %s\n",
6532                         poptBadOption(pc, 0), poptStrerror(opt));
6533                 exit(1);
6534         }
6535
6536         if (options.maxruntime == 0) {
6537                 const char *ctdb_timeout;
6538
6539                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6540                 if (ctdb_timeout != NULL) {
6541                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6542                 } else {
6543                         options.maxruntime = 120;
6544                 }
6545         }
6546         if (options.maxruntime <= 120) {
6547                 /* default timeout is 120 seconds */
6548                 options.maxruntime = 120;
6549         }
6550
6551         if (options.machineparsable) {
6552                 options.machinereadable = 1;
6553         }
6554
6555         /* setup the remaining options for the commands */
6556         extra_argc = 0;
6557         extra_argv = poptGetArgs(pc);
6558         if (extra_argv) {
6559                 extra_argv++;
6560                 while (extra_argv[extra_argc]) extra_argc++;
6561         }
6562
6563         if (extra_argc < 1) {
6564                 usage(NULL);
6565         }
6566
6567         cmd = match_command(extra_argv[0]);
6568         if (cmd == NULL) {
6569                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6570                 exit(1);
6571         }
6572
6573         /* Enable logging */
6574         setup_logging("ctdb", DEBUG_STDERR);
6575         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6576                 DEBUGLEVEL = loglevel;
6577         } else {
6578                 DEBUGLEVEL = DEBUG_ERR;
6579         }
6580
6581         signal(SIGALRM, alarm_handler);
6582         alarm(options.maxruntime);
6583
6584         ret = process_command(cmd, extra_argc, extra_argv);
6585         if (ret == -1) {
6586                 ret = 1;
6587         }
6588
6589         (void)poptFreeContext(pc);
6590
6591         return ret;
6592 }