ctdb-tools: Fix CID 1364701 - resource leak
[amitay/samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "common/db_hash.h"
37 #include "common/logging.h"
38 #include "protocol/protocol.h"
39 #include "protocol/protocol_api.h"
40 #include "common/system.h"
41 #include "client/client.h"
42
43 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
44
45 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
46 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
47
48 static struct {
49         const char *socket;
50         const char *debuglevelstr;
51         int timelimit;
52         int pnn;
53         int machinereadable;
54         const char *sep;
55         int machineparsable;
56         int verbose;
57         int maxruntime;
58         int printemptyrecords;
59         int printdatasize;
60         int printlmaster;
61         int printhash;
62         int printrecordflags;
63 } options;
64
65 static poptContext pc;
66
67 struct ctdb_context {
68         struct tevent_context *ev;
69         struct ctdb_client_context *client;
70         struct ctdb_node_map *nodemap;
71         uint32_t pnn, cmd_pnn;
72         uint64_t srvid;
73 };
74
75 static void usage(const char *command);
76
77 /*
78  * Utility Functions
79  */
80
81 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
82 {
83         return (tv2->tv_sec - tv->tv_sec) +
84                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
85 }
86
87 static struct ctdb_node_and_flags *get_node_by_pnn(
88                                         struct ctdb_node_map *nodemap,
89                                         uint32_t pnn)
90 {
91         int i;
92
93         for (i=0; i<nodemap->num; i++) {
94                 if (nodemap->node[i].pnn == pnn) {
95                         return &nodemap->node[i];
96                 }
97         }
98         return NULL;
99 }
100
101 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
102 {
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         char *flags_str = NULL;
116         int i;
117
118         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
119                 if (flags & flag_names[i].flag) {
120                         if (flags_str == NULL) {
121                                 flags_str = talloc_asprintf(mem_ctx,
122                                                 "%s", flag_names[i].name);
123                         } else {
124                                 flags_str = talloc_asprintf_append(flags_str,
125                                                 "|%s", flag_names[i].name);
126                         }
127                         if (flags_str == NULL) {
128                                 return "OUT-OF-MEMORY";
129                         }
130                 }
131         }
132         if (flags_str == NULL) {
133                 return "OK";
134         }
135
136         return flags_str;
137 }
138
139 static uint64_t next_srvid(struct ctdb_context *ctdb)
140 {
141         ctdb->srvid += 1;
142         return ctdb->srvid;
143 }
144
145 /*
146  * Get consistent nodemap information.
147  *
148  * If nodemap is already cached, use that. If not get it.
149  * If the current node is BANNED, then get nodemap from "better" node.
150  */
151 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
152 {
153         TALLOC_CTX *tmp_ctx;
154         struct ctdb_node_map *nodemap;
155         struct ctdb_node_and_flags *node;
156         uint32_t current_node;
157         int ret;
158
159         if (force) {
160                 TALLOC_FREE(ctdb->nodemap);
161         }
162
163         if (ctdb->nodemap != NULL) {
164                 return ctdb->nodemap;
165         }
166
167         tmp_ctx = talloc_new(ctdb);
168         if (tmp_ctx == NULL) {
169                 return false;
170         }
171
172         current_node = ctdb->pnn;
173 again:
174         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
175                                     current_node, TIMEOUT(), &nodemap);
176         if (ret != 0) {
177                 fprintf(stderr, "Failed to get nodemap from node %u\n",
178                         current_node);
179                 goto failed;
180         }
181
182         node = get_node_by_pnn(nodemap, current_node);
183         if (node->flags & NODE_FLAGS_BANNED) {
184                 /* Pick next node */
185                 do {
186                         current_node = (current_node + 1) % nodemap->num;
187                         node = get_node_by_pnn(nodemap, current_node);
188                         if (! (node->flags &
189                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
190                                 break;
191                         }
192                 } while (current_node != ctdb->pnn);
193
194                 if (current_node == ctdb->pnn) {
195                         /* Tried all nodes in the cluster */
196                         fprintf(stderr, "Warning: All nodes are banned.\n");
197                         goto failed;
198                 }
199
200                 goto again;
201         }
202
203         ctdb->nodemap = talloc_steal(ctdb, nodemap);
204         return nodemap;
205
206 failed:
207         talloc_free(tmp_ctx);
208         return NULL;
209 }
210
211 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
212 {
213         struct ctdb_node_map *nodemap;
214         bool found;
215         int i;
216
217         if (pnn == -1) {
218                 return false;
219         }
220
221         nodemap = get_nodemap(ctdb, false);
222         if (nodemap == NULL) {
223                 return false;
224         }
225
226         found = false;
227         for (i=0; i<nodemap->num; i++) {
228                 if (nodemap->node[i].pnn == pnn) {
229                         found = true;
230                         break;
231                 }
232         }
233         if (! found) {
234                 fprintf(stderr, "Node %u does not exist\n", pnn);
235                 return false;
236         }
237
238         if (nodemap->node[i].flags &
239             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
240                 fprintf(stderr, "Node %u has status %s\n", pnn,
241                         pretty_print_flags(ctdb, nodemap->node[i].flags));
242                 return false;
243         }
244
245         return true;
246 }
247
248 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
249                                             struct ctdb_node_map *nodemap)
250 {
251         struct ctdb_node_map *nodemap2;
252
253         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
254         if (nodemap2 == NULL) {
255                 return NULL;
256         }
257
258         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
259                                       nodemap->num);
260         if (nodemap2->node == NULL) {
261                 talloc_free(nodemap2);
262                 return NULL;
263         }
264
265         return nodemap2;
266 }
267
268 /*
269  * Get the number and the list of matching nodes
270  *
271  *   nodestring :=  NULL | all | pnn,[pnn,...]
272  *
273  * If nodestring is NULL, use the current node.
274  */
275 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
276                              const char *nodestring,
277                              struct ctdb_node_map **out)
278 {
279         struct ctdb_node_map *nodemap, *nodemap2;
280         struct ctdb_node_and_flags *node;
281         int i;
282
283         nodemap = get_nodemap(ctdb, false);
284         if (nodemap == NULL) {
285                 return false;
286         }
287
288         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
289         if (nodemap2 == NULL) {
290                 return false;
291         }
292
293         if (nodestring == NULL) {
294                 for (i=0; i<nodemap->num; i++) {
295                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
296                                 nodemap2->node[0] = nodemap->node[i];
297                                 break;
298                         }
299                 }
300                 nodemap2->num = 1;
301
302                 goto done;
303         }
304
305         if (strcmp(nodestring, "all") == 0) {
306                 for (i=0; i<nodemap->num; i++) {
307                         nodemap2->node[i] = nodemap->node[i];
308                 }
309                 nodemap2->num = nodemap->num;
310
311                 goto done;
312         } else {
313                 char *ns, *tok;
314
315                 ns = talloc_strdup(mem_ctx, nodestring);
316                 if (ns == NULL) {
317                         return false;
318                 }
319
320                 tok = strtok(ns, ",");
321                 while (tok != NULL) {
322                         uint32_t pnn;
323                         char *endptr;
324
325                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
326                         if (pnn == 0 && tok == endptr) {
327                                 fprintf(stderr, "Invalid node %s\n", tok);
328                                         return false;
329                         }
330
331                         node = get_node_by_pnn(nodemap, pnn);
332                         if (node == NULL) {
333                                 fprintf(stderr, "Node %u does not exist\n",
334                                         pnn);
335                                 return false;
336                         }
337
338                         nodemap2->node[nodemap2->num] = *node;
339                         nodemap2->num += 1;
340
341                         tok = strtok(NULL, ",");
342                 }
343         }
344
345 done:
346         *out = nodemap2;
347         return true;
348 }
349
350 /* Compare IP address */
351 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
352 {
353         bool ret = false;
354
355         if (ip1->sa.sa_family != ip2->sa.sa_family) {
356                 return false;
357         }
358
359         switch (ip1->sa.sa_family) {
360         case AF_INET:
361                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
362                               sizeof(struct in_addr)) == 0);
363                 break;
364
365         case AF_INET6:
366                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
367                               sizeof(struct in6_addr)) == 0);
368                 break;
369         }
370
371         return ret;
372 }
373
374 /* Append a node to a node map with given address and flags */
375 static bool node_map_add(struct ctdb_node_map *nodemap,
376                          const char *nstr, uint32_t flags)
377 {
378         ctdb_sock_addr addr;
379         uint32_t num;
380         struct ctdb_node_and_flags *n;
381
382         if (! parse_ip(nstr, NULL, 0, &addr)) {
383                 fprintf(stderr, "Invalid IP address %s\n", nstr);
384                 return false;
385         }
386
387         num = nodemap->num;
388         nodemap->node = talloc_realloc(nodemap, nodemap->node,
389                                        struct ctdb_node_and_flags, num+1);
390         if (nodemap->node == NULL) {
391                 return false;
392         }
393
394         n = &nodemap->node[num];
395         n->addr = addr;
396         n->pnn = num;
397         n->flags = flags;
398
399         nodemap->num = num+1;
400         return true;
401 }
402
403 /* Read a nodes file into a node map */
404 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
405                                                   const char *nlist)
406 {
407         char **lines;
408         int nlines;
409         int i;
410         struct ctdb_node_map *nodemap;
411
412         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
413         if (nodemap == NULL) {
414                 return NULL;
415         }
416
417         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
418         if (lines == NULL) {
419                 return NULL;
420         }
421
422         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
423                 nlines--;
424         }
425
426         for (i=0; i<nlines; i++) {
427                 char *node;
428                 uint32_t flags;
429                 size_t len;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436
437                 len = strlen(node);
438
439                 /* strip trailing spaces */
440                 while ((len > 1) &&
441                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
442                 {
443                         node[len-1] = '\0';
444                         len--;
445                 }
446
447                 if (len == 0) {
448                         continue;
449                 }
450                 if (*node == '#') {
451                         /* A "deleted" node is a node that is
452                            commented out in the nodes file.  This is
453                            used instead of removing a line, which
454                            would cause subsequent nodes to change
455                            their PNN. */
456                         flags = NODE_FLAGS_DELETED;
457                         node = discard_const("0.0.0.0");
458                 } else {
459                         flags = 0;
460                 }
461                 if (! node_map_add(nodemap, node, flags)) {
462                         talloc_free(lines);
463                         TALLOC_FREE(nodemap);
464                         return NULL;
465                 }
466         }
467
468         talloc_free(lines);
469         return nodemap;
470 }
471
472 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
473 {
474         struct ctdb_node_map *nodemap;
475         char *nodepath;
476         const char *nodes_list = NULL;
477
478         if (pnn != CTDB_UNKNOWN_PNN) {
479                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
480                 if (nodepath != NULL) {
481                         nodes_list = getenv(nodepath);
482                 }
483         }
484         if (nodes_list == NULL) {
485                 nodes_list = getenv("CTDB_NODES");
486         }
487         if (nodes_list == NULL) {
488                 const char *basedir = getenv("CTDB_BASE");
489                 if (basedir == NULL) {
490                         basedir = CTDB_ETCDIR;
491                 }
492                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
493                 if (nodes_list == NULL) {
494                         fprintf(stderr, "Memory allocation error\n");
495                         return NULL;
496                 }
497         }
498
499         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
500         if (nodemap == NULL) {
501                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
502                         nodes_list);
503                 return NULL;
504         }
505
506         return nodemap;
507 }
508
509 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
510                                  struct ctdb_context *ctdb,
511                                  struct ctdb_dbid_map *dbmap,
512                                  const char *db_name)
513 {
514         struct ctdb_dbid *db = NULL;
515         const char *name;
516         int ret, i;
517
518         for (i=0; i<dbmap->num; i++) {
519                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
520                                            ctdb->pnn, TIMEOUT(),
521                                            dbmap->dbs[i].db_id, &name);
522                 if (ret != 0) {
523                         return false;
524                 }
525
526                 if (strcmp(db_name, name) == 0) {
527                         talloc_free(discard_const(name));
528                         db = &dbmap->dbs[i];
529                         break;
530                 }
531         }
532
533         return db;
534 }
535
536 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
537                       const char *db_arg, uint32_t *db_id,
538                       const char **db_name, uint8_t *db_flags)
539 {
540         struct ctdb_dbid_map *dbmap;
541         struct ctdb_dbid *db = NULL;
542         uint32_t id = 0;
543         const char *name = NULL;
544         int ret, i;
545
546         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
547                                   ctdb->pnn, TIMEOUT(), &dbmap);
548         if (ret != 0) {
549                 return false;
550         }
551
552         if (strncmp(db_arg, "0x", 2) == 0) {
553                 id = strtoul(db_arg, NULL, 0);
554                 for (i=0; i<dbmap->num; i++) {
555                         if (id == dbmap->dbs[i].db_id) {
556                                 db = &dbmap->dbs[i];
557                                 break;
558                         }
559                 }
560         } else {
561                 name = db_arg;
562                 db = db_find(mem_ctx, ctdb, dbmap, name);
563         }
564
565         if (db == NULL) {
566                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
567                 return false;
568         }
569
570         if (name == NULL) {
571                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
572                                            ctdb->pnn, TIMEOUT(), id, &name);
573                 if (ret != 0) {
574                         return false;
575                 }
576         }
577
578         if (db_id != NULL) {
579                 *db_id = db->db_id;
580         }
581         if (db_name != NULL) {
582                 *db_name = talloc_strdup(mem_ctx, name);
583         }
584         if (db_flags != NULL) {
585                 *db_flags = db->flags;
586         }
587         return true;
588 }
589
590 static int h2i(char h)
591 {
592         if (h >= 'a' && h <= 'f') {
593                 return h - 'a' + 10;
594         }
595         if (h >= 'A' && h <= 'F') {
596                 return h - 'f' + 10;
597         }
598         return h - '0';
599 }
600
601 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
602                        TDB_DATA *out)
603 {
604         int i;
605         TDB_DATA data;
606
607         if (len & 0x01) {
608                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
609                         str);
610                 return EINVAL;
611         }
612
613         data.dsize = len / 2;
614         data.dptr = talloc_size(mem_ctx, data.dsize);
615         if (data.dptr == NULL) {
616                 return ENOMEM;
617         }
618
619         for (i=0; i<data.dsize; i++) {
620                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
621         }
622
623         *out = data;
624         return 0;
625 }
626
627 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
628                        TDB_DATA *out)
629 {
630         TDB_DATA data;
631         int ret = 0;
632
633         if (strncmp(str, "0x", 2) == 0) {
634                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
635         } else {
636                 data.dptr = talloc_memdup(mem_ctx, str, len);
637                 if (data.dptr == NULL) {
638                         return ENOMEM;
639                 }
640                 data.dsize = len;
641         }
642
643         *out = data;
644         return ret;
645 }
646
647 static int run_helper(const char *command, const char *path, const char *arg1)
648 {
649         pid_t pid;
650         int save_errno, status, ret;
651
652         pid = fork();
653         if (pid < 0) {
654                 save_errno = errno;
655                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
656                         command, path, strerror(save_errno));
657                 return save_errno;
658         }
659
660         if (pid == 0) {
661                 ret = execl(path, path, arg1, NULL);
662                 if (ret == -1) {
663                         _exit(errno);
664                 }
665                 /* Should not happen */
666                 _exit(ENOEXEC);
667         }
668
669         ret = waitpid(pid, &status, 0);
670         if (ret == -1) {
671                 save_errno = errno;
672                 fprintf(stderr, "waitpid() failed for %s - %s\n",
673                         command, strerror(save_errno));
674                 return save_errno;
675         }
676
677         if (WIFEXITED(status)) {
678                 ret = WEXITSTATUS(status);
679                 return ret;
680         }
681
682         if (WIFSIGNALED(status)) {
683                 fprintf(stderr, "%s terminated with signal %d\n",
684                         command, WTERMSIG(status));
685                 return EINTR;
686         }
687
688         return 0;
689 }
690
691 /*
692  * Command Functions
693  */
694
695 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
696                            int argc, const char **argv)
697 {
698         printf("%s\n", CTDB_VERSION_STRING);
699         return 0;
700 }
701
702 static bool partially_online(TALLOC_CTX *mem_ctx,
703                              struct ctdb_context *ctdb,
704                              struct ctdb_node_and_flags *node)
705 {
706         struct ctdb_iface_list *iface_list;
707         int ret, i;
708         bool status = false;
709
710         if (node->flags != 0) {
711                 return false;
712         }
713
714         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
715                                    node->pnn, TIMEOUT(), &iface_list);
716         if (ret != 0) {
717                 return false;
718         }
719
720         status = false;
721         for (i=0; i < iface_list->num; i++) {
722                 if (iface_list->iface[i].link_state == 0) {
723                         status = true;
724                         break;
725                 }
726         }
727
728         return status;
729 }
730
731 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
732                                   struct ctdb_context *ctdb,
733                                   struct ctdb_node_map *nodemap,
734                                   uint32_t mypnn)
735 {
736         struct ctdb_node_and_flags *node;
737         int i;
738
739         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
740                options.sep,
741                "Node", options.sep,
742                "IP", options.sep,
743                "Disconnected", options.sep,
744                "Banned", options.sep,
745                "Disabled", options.sep,
746                "Unhealthy", options.sep,
747                "Stopped", options.sep,
748                "Inactive", options.sep,
749                "PartiallyOnline", options.sep,
750                "ThisNode", options.sep);
751
752         for (i=0; i<nodemap->num; i++) {
753                 node = &nodemap->node[i];
754                 if (node->flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757
758                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
759                        options.sep,
760                        node->pnn, options.sep,
761                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
762                        options.sep,
763                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
764                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
765                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
766                        options.sep,
767                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
768                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
769                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
770                        partially_online(mem_ctx, ctdb, node), options.sep,
771                        (node->pnn == mypnn)?'Y':'N', options.sep);
772         }
773
774 }
775
776 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
777                           struct ctdb_node_map *nodemap, uint32_t mypnn)
778 {
779         struct ctdb_node_and_flags *node;
780         int num_deleted_nodes = 0;
781         int i;
782
783         for (i=0; i<nodemap->num; i++) {
784                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
785                         num_deleted_nodes++;
786                 }
787         }
788
789         if (num_deleted_nodes == 0) {
790                 printf("Number of nodes:%d\n", nodemap->num);
791         } else {
792                 printf("Number of nodes:%d (including %d deleted nodes)\n",
793                        nodemap->num, num_deleted_nodes);
794         }
795
796         for (i=0; i<nodemap->num; i++) {
797                 node = &nodemap->node[i];
798                 if (node->flags & NODE_FLAGS_DELETED) {
799                         continue;
800                 }
801
802                 printf("pnn:%u %-16s %s%s\n",
803                        node->pnn,
804                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
805                        partially_online(mem_ctx, ctdb, node) ?
806                                 "PARTIALLYONLINE" :
807                                 pretty_print_flags(mem_ctx, node->flags),
808                        node->pnn == mypnn ? " (THIS NODE)" : "");
809         }
810 }
811
812 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
813                          struct ctdb_node_map *nodemap, uint32_t mypnn,
814                          struct ctdb_vnn_map *vnnmap, int recmode,
815                          uint32_t recmaster)
816 {
817         int i;
818
819         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
820
821         if (vnnmap->generation == INVALID_GENERATION) {
822                 printf("Generation:INVALID\n");
823         } else {
824                 printf("Generation:%u\n", vnnmap->generation);
825         }
826         printf("Size:%d\n", vnnmap->size);
827         for (i=0; i<vnnmap->size; i++) {
828                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
829         }
830
831         printf("Recovery mode:%s (%d)\n",
832                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
833                recmode);
834         printf("Recovery master:%d\n", recmaster);
835 }
836
837 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
838                           int argc, const char **argv)
839 {
840         struct ctdb_node_map *nodemap;
841         struct ctdb_vnn_map *vnnmap;
842         int recmode;
843         uint32_t recmaster;
844         int ret;
845
846         if (argc != 0) {
847                 usage("status");
848         }
849
850         nodemap = get_nodemap(ctdb, false);
851         if (nodemap == NULL) {
852                 return 1;
853         }
854
855         if (options.machinereadable == 1) {
856                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
857                 return 0;
858         }
859
860         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
861                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
862         if (ret != 0) {
863                 return ret;
864         }
865
866         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
867                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
868         if (ret != 0) {
869                 return ret;
870         }
871
872         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
873                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
874         if (ret != 0) {
875                 return ret;
876         }
877
878         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
879                      recmode, recmaster);
880         return 0;
881 }
882
883 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
884                           int argc, const char **argv)
885 {
886         struct ctdb_uptime *uptime;
887         int ret, tmp, days, hours, minutes, seconds;
888
889         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
890                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
891         if (ret != 0) {
892                 return ret;
893         }
894
895         printf("Current time of node %-4u     :                %s",
896                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
897
898         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
899         seconds = tmp % 60; tmp /= 60;
900         minutes = tmp % 60; tmp /= 60;
901         hours = tmp % 24; tmp /= 24;
902         days = tmp;
903
904         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
905                days, hours, minutes, seconds,
906                ctime(&uptime->ctdbd_start_time.tv_sec));
907
908         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
909         seconds = tmp % 60; tmp /= 60;
910         minutes = tmp % 60; tmp /= 60;
911         hours = tmp % 24; tmp /= 24;
912         days = tmp;
913
914         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
915                days, hours, minutes, seconds,
916                ctime(&uptime->last_recovery_finished.tv_sec));
917
918         printf("Duration of last recovery/failover: %lf seconds\n",
919                timeval_delta(&uptime->last_recovery_finished,
920                              &uptime->last_recovery_started));
921
922         return 0;
923 }
924
925 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
926                         int argc, const char **argv)
927 {
928         struct timeval tv;
929         int ret, num_clients;
930
931         tv = timeval_current();
932         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
933                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
934         if (ret != 0) {
935                 return ret;
936         }
937
938         printf("response from %u time=%.6f sec  (%d clients)\n",
939                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
940         return 0;
941 }
942
943 const char *runstate_to_string(enum ctdb_runstate runstate);
944 enum ctdb_runstate runstate_from_string(const char *runstate_str);
945
946 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
947                             int argc, const char **argv)
948 {
949         enum ctdb_runstate runstate;
950         bool found;
951         int ret, i;
952
953         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
954                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
955         if (ret != 0) {
956                 return ret;
957         }
958
959         found = true;
960         for (i=0; i<argc; i++) {
961                 enum ctdb_runstate t;
962
963                 found = false;
964                 t = ctdb_runstate_from_string(argv[i]);
965                 if (t == CTDB_RUNSTATE_UNKNOWN) {
966                         printf("Invalid run state (%s)\n", argv[i]);
967                         return 1;
968                 }
969
970                 if (t == runstate) {
971                         found = true;
972                         break;
973                 }
974         }
975
976         if (! found) {
977                 printf("CTDB not in required run state (got %s)\n",
978                        ctdb_runstate_to_string(runstate));
979                 return 1;
980         }
981
982         printf("%s\n", ctdb_runstate_to_string(runstate));
983         return 0;
984 }
985
986 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
987                           int argc, const char **argv)
988 {
989         struct ctdb_var_list *tun_var_list;
990         uint32_t value;
991         int ret, i;
992         bool found;
993
994         if (argc != 1) {
995                 usage("getvar");
996         }
997
998         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
999                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1000         if (ret != 0) {
1001                 fprintf(stderr,
1002                         "Failed to get list of variables from node %u\n",
1003                         ctdb->cmd_pnn);
1004                 return ret;
1005         }
1006
1007         found = false;
1008         for (i=0; i<tun_var_list->count; i++) {
1009                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1010                         found = true;
1011                         break;
1012                 }
1013         }
1014
1015         if (! found) {
1016                 printf("No such tunable %s\n", argv[0]);
1017                 return 1;
1018         }
1019
1020         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1021                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1022         if (ret != 0) {
1023                 return ret;
1024         }
1025
1026         printf("%-26s = %u\n", argv[0], value);
1027         return 0;
1028 }
1029
1030 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1031                           int argc, const char **argv)
1032 {
1033         struct ctdb_var_list *tun_var_list;
1034         struct ctdb_tunable tunable;
1035         int ret, i;
1036         bool found;
1037
1038         if (argc != 2) {
1039                 usage("setvar");
1040         }
1041
1042         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1043                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1044         if (ret != 0) {
1045                 fprintf(stderr,
1046                         "Failed to get list of variables from node %u\n",
1047                         ctdb->cmd_pnn);
1048                 return ret;
1049         }
1050
1051         found = false;
1052         for (i=0; i<tun_var_list->count; i++) {
1053                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1054                         found = true;
1055                         break;
1056                 }
1057         }
1058
1059         if (! found) {
1060                 printf("No such tunable %s\n", argv[0]);
1061                 return 1;
1062         }
1063
1064         tunable.name = argv[0];
1065         tunable.value = strtoul(argv[1], NULL, 0);
1066
1067         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1068                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1069         if (ret != 0) {
1070                 if (ret == 1) {
1071                         fprintf(stderr,
1072                                 "Setting obsolete tunable variable '%s'\n",
1073                                tunable.name);
1074                         return 0;
1075                 }
1076         }
1077
1078         return ret;
1079 }
1080
1081 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1082                             int argc, const char **argv)
1083 {
1084         struct ctdb_var_list *tun_var_list;
1085         int ret, i;
1086
1087         if (argc != 0) {
1088                 usage("listvars");
1089         }
1090
1091         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1092                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1093         if (ret != 0) {
1094                 return ret;
1095         }
1096
1097         for (i=0; i<tun_var_list->count; i++) {
1098                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1099         }
1100
1101         return 0;
1102 }
1103
1104 const struct {
1105         const char *name;
1106         uint32_t offset;
1107 } stats_fields[] = {
1108 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1109         STATISTICS_FIELD(num_clients),
1110         STATISTICS_FIELD(frozen),
1111         STATISTICS_FIELD(recovering),
1112         STATISTICS_FIELD(num_recoveries),
1113         STATISTICS_FIELD(client_packets_sent),
1114         STATISTICS_FIELD(client_packets_recv),
1115         STATISTICS_FIELD(node_packets_sent),
1116         STATISTICS_FIELD(node_packets_recv),
1117         STATISTICS_FIELD(keepalive_packets_sent),
1118         STATISTICS_FIELD(keepalive_packets_recv),
1119         STATISTICS_FIELD(node.req_call),
1120         STATISTICS_FIELD(node.reply_call),
1121         STATISTICS_FIELD(node.req_dmaster),
1122         STATISTICS_FIELD(node.reply_dmaster),
1123         STATISTICS_FIELD(node.reply_error),
1124         STATISTICS_FIELD(node.req_message),
1125         STATISTICS_FIELD(node.req_control),
1126         STATISTICS_FIELD(node.reply_control),
1127         STATISTICS_FIELD(client.req_call),
1128         STATISTICS_FIELD(client.req_message),
1129         STATISTICS_FIELD(client.req_control),
1130         STATISTICS_FIELD(timeouts.call),
1131         STATISTICS_FIELD(timeouts.control),
1132         STATISTICS_FIELD(timeouts.traverse),
1133         STATISTICS_FIELD(locks.num_calls),
1134         STATISTICS_FIELD(locks.num_current),
1135         STATISTICS_FIELD(locks.num_pending),
1136         STATISTICS_FIELD(locks.num_failed),
1137         STATISTICS_FIELD(total_calls),
1138         STATISTICS_FIELD(pending_calls),
1139         STATISTICS_FIELD(childwrite_calls),
1140         STATISTICS_FIELD(pending_childwrite_calls),
1141         STATISTICS_FIELD(memory_used),
1142         STATISTICS_FIELD(max_hop_count),
1143         STATISTICS_FIELD(total_ro_delegations),
1144         STATISTICS_FIELD(total_ro_revokes),
1145 };
1146
1147 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1148
1149 static void print_statistics_machine(struct ctdb_statistics *s,
1150                                      bool show_header)
1151 {
1152         int i;
1153
1154         if (show_header) {
1155                 printf("CTDB version%s", options.sep);
1156                 printf("Current time of statistics%s", options.sep);
1157                 printf("Statistics collected since%s", options.sep);
1158                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1159                         printf("%s%s", stats_fields[i].name, options.sep);
1160                 }
1161                 printf("num_reclock_ctdbd_latency%s", options.sep);
1162                 printf("min_reclock_ctdbd_latency%s", options.sep);
1163                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1164                 printf("max_reclock_ctdbd_latency%s", options.sep);
1165
1166                 printf("num_reclock_recd_latency%s", options.sep);
1167                 printf("min_reclock_recd_latency%s", options.sep);
1168                 printf("avg_reclock_recd_latency%s", options.sep);
1169                 printf("max_reclock_recd_latency%s", options.sep);
1170
1171                 printf("num_call_latency%s", options.sep);
1172                 printf("min_call_latency%s", options.sep);
1173                 printf("avg_call_latency%s", options.sep);
1174                 printf("max_call_latency%s", options.sep);
1175
1176                 printf("num_lockwait_latency%s", options.sep);
1177                 printf("min_lockwait_latency%s", options.sep);
1178                 printf("avg_lockwait_latency%s", options.sep);
1179                 printf("max_lockwait_latency%s", options.sep);
1180
1181                 printf("num_childwrite_latency%s", options.sep);
1182                 printf("min_childwrite_latency%s", options.sep);
1183                 printf("avg_childwrite_latency%s", options.sep);
1184                 printf("max_childwrite_latency%s", options.sep);
1185                 printf("\n");
1186         }
1187
1188         printf("%u%s", CTDB_PROTOCOL, options.sep);
1189         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1191         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1192                 printf("%u%s",
1193                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1194                        options.sep);
1195         }
1196         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1197         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1198         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1199         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1200
1201         printf("%u%s", s->reclock.recd.num, options.sep);
1202         printf("%.6f%s", s->reclock.recd.min, options.sep);
1203         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1204         printf("%.6f%s", s->reclock.recd.max, options.sep);
1205
1206         printf("%d%s", s->call_latency.num, options.sep);
1207         printf("%.6f%s", s->call_latency.min, options.sep);
1208         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1209         printf("%.6f%s", s->call_latency.max, options.sep);
1210
1211         printf("%d%s", s->childwrite_latency.num, options.sep);
1212         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1213         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1214         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1215         printf("\n");
1216 }
1217
1218 static void print_statistics(struct ctdb_statistics *s)
1219 {
1220         int tmp, days, hours, minutes, seconds;
1221         int i;
1222         const char *prefix = NULL;
1223         int preflen = 0;
1224
1225         tmp = s->statistics_current_time.tv_sec -
1226               s->statistics_start_time.tv_sec;
1227         seconds = tmp % 60; tmp /= 60;
1228         minutes = tmp % 60; tmp /= 60;
1229         hours   = tmp % 24; tmp /= 24;
1230         days    = tmp;
1231
1232         printf("CTDB version %u\n", CTDB_PROTOCOL);
1233         printf("Current time of statistics  :                %s",
1234                ctime(&s->statistics_current_time.tv_sec));
1235         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1236                days, hours, minutes, seconds,
1237                ctime(&s->statistics_start_time.tv_sec));
1238
1239         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1240                 if (strchr(stats_fields[i].name, '.') != NULL) {
1241                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1242                         if (! prefix ||
1243                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1244                                 prefix = stats_fields[i].name;
1245                                 printf(" %*.*s\n", preflen-1, preflen-1,
1246                                        stats_fields[i].name);
1247                         }
1248                 } else {
1249                         preflen = 0;
1250                 }
1251                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1252                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1253                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1254         }
1255
1256         printf(" hop_count_buckets:");
1257         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1258                 printf(" %d", s->hop_count_bucket[i]);
1259         }
1260         printf("\n");
1261         printf(" lock_buckets:");
1262         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1263                 printf(" %d", s->locks.buckets[i]);
1264         }
1265         printf("\n");
1266         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1267                "locks_latency      MIN/AVG/MAX",
1268                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1269                s->locks.latency.max, s->locks.latency.num);
1270
1271         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1272                "reclock_ctdbd      MIN/AVG/MAX",
1273                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1274                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1275
1276         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1277                "reclock_recd       MIN/AVG/MAX",
1278                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1279                s->reclock.recd.max, s->reclock.recd.num);
1280
1281         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1282                "call_latency       MIN/AVG/MAX",
1283                s->call_latency.min, LATENCY_AVG(s->call_latency),
1284                s->call_latency.max, s->call_latency.num);
1285
1286         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1287                "childwrite_latency MIN/AVG/MAX",
1288                s->childwrite_latency.min,
1289                LATENCY_AVG(s->childwrite_latency),
1290                s->childwrite_latency.max, s->childwrite_latency.num);
1291 }
1292
1293 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1294                               int argc, const char **argv)
1295 {
1296         struct ctdb_statistics *stats;
1297         int ret;
1298
1299         if (argc != 0) {
1300                 usage("statistics");
1301         }
1302
1303         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1304                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1305         if (ret != 0) {
1306                 return ret;
1307         }
1308
1309         if (options.machinereadable) {
1310                 print_statistics_machine(stats, true);
1311         } else {
1312                 print_statistics(stats);
1313         }
1314
1315         return 0;
1316 }
1317
1318 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1319                                     struct ctdb_context *ctdb,
1320                                     int argc, const char **argv)
1321 {
1322         int ret;
1323
1324         if (argc != 0) {
1325                 usage("statisticsreset");
1326         }
1327
1328         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1329                                          ctdb->cmd_pnn, TIMEOUT());
1330         if (ret != 0) {
1331                 return ret;
1332         }
1333
1334         return 0;
1335 }
1336
1337 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1338                          int argc, const char **argv)
1339 {
1340         struct ctdb_statistics_list *slist;
1341         int ret, count = 0, i;
1342         bool show_header = true;
1343
1344         if (argc > 1) {
1345                 usage("stats");
1346         }
1347
1348         if (argc == 1) {
1349                 count = atoi(argv[0]);
1350         }
1351
1352         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1353                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1354         if (ret != 0) {
1355                 return ret;
1356         }
1357
1358         for (i=0; i<slist->num; i++) {
1359                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1360                         continue;
1361                 }
1362                 if (options.machinereadable == 1) {
1363                         print_statistics_machine(&slist->stats[i],
1364                                                  show_header);
1365                         show_header = false;
1366                 } else {
1367                         print_statistics(&slist->stats[i]);
1368                 }
1369                 if (count > 0 && i == count) {
1370                         break;
1371                 }
1372         }
1373
1374         return 0;
1375 }
1376
1377 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1378                      struct ctdb_public_ip_list *ips,
1379                      struct ctdb_public_ip_info **ipinfo,
1380                      bool all_nodes)
1381 {
1382         int i, j;
1383         char *conf, *avail, *active;
1384
1385         if (options.machinereadable == 1) {
1386                 printf("%s%s%s%s%s", options.sep,
1387                        "Public IP", options.sep,
1388                        "Node", options.sep);
1389                 if (options.verbose == 1) {
1390                         printf("%s%s%s%s%s%s\n",
1391                                "ActiveInterfaces", options.sep,
1392                                "AvailableInterfaces", options.sep,
1393                                "ConfiguredInterfaces", options.sep);
1394                 } else {
1395                         printf("\n");
1396                 }
1397         } else {
1398                 if (all_nodes) {
1399                         printf("Public IPs on ALL nodes\n");
1400                 } else {
1401                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1402                 }
1403         }
1404
1405         /* IPs are reverse sorted */
1406         for (i=ips->num-1; i>=0; i--) {
1407
1408                 if (options.machinereadable == 1) {
1409                         printf("%s%s%s%d%s", options.sep,
1410                                ctdb_sock_addr_to_string(
1411                                         mem_ctx, &ips->ip[i].addr),
1412                                options.sep,
1413                                (int)ips->ip[i].pnn, options.sep);
1414                 } else {
1415                         printf("%s", ctdb_sock_addr_to_string(
1416                                                 mem_ctx, &ips->ip[i].addr));
1417                 }
1418
1419                 if (options.verbose == 0) {
1420                         if (options.machinereadable == 1) {
1421                                 printf("\n");
1422                         } else {
1423                                 printf(" %d\n", (int)ips->ip[i].pnn);
1424                         }
1425                         continue;
1426                 }
1427
1428                 conf = NULL;
1429                 avail = NULL;
1430                 active = NULL;
1431
1432                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1433                         struct ctdb_iface *iface;
1434
1435                         iface = &ipinfo[i]->ifaces->iface[j];
1436                         if (conf == NULL) {
1437                                 conf = talloc_strdup(mem_ctx, iface->name);
1438                         } else {
1439                                 conf = talloc_asprintf_append(
1440                                                 mem_ctx, ",%s", iface->name);
1441                         }
1442
1443                         if (ipinfo[i]->active_idx == j) {
1444                                 active = iface->name;
1445                         }
1446
1447                         if (iface->link_state == 0) {
1448                                 continue;
1449                         }
1450
1451                         if (avail == NULL) {
1452                                 avail = talloc_strdup(mem_ctx, iface->name);
1453                         } else {
1454                                 avail = talloc_asprintf_append(
1455                                                 mem_ctx, ",%s", iface->name);
1456                         }
1457                 }
1458
1459                 if (options.machinereadable == 1) {
1460                         printf("%s%s%s%s%s%s\n",
1461                                active ? active : "", options.sep,
1462                                avail ? avail : "", options.sep,
1463                                conf ? conf : "", options.sep);
1464                 } else {
1465                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1466                                ips->ip[i].pnn, active ? active : "",
1467                                avail ? avail : "", conf ? conf : "");
1468                 }
1469         }
1470 }
1471
1472 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1473                        size_t datalen, void *private_data)
1474 {
1475         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1476                 private_data, struct ctdb_public_ip_list);
1477         struct ctdb_public_ip *ip;
1478
1479         ip = (struct ctdb_public_ip *)databuf;
1480         ips->ip[ips->num] = *ip;
1481         ips->num += 1;
1482
1483         return 0;
1484 }
1485
1486 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1487                               struct ctdb_public_ip_list **out)
1488 {
1489         struct ctdb_node_map *nodemap;
1490         struct ctdb_public_ip_list *ips;
1491         struct db_hash_context *ipdb;
1492         uint32_t *pnn_list;
1493         int ret, count, i, j;
1494
1495         nodemap = get_nodemap(ctdb, false);
1496         if (nodemap == NULL) {
1497                 return 1;
1498         }
1499
1500         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1501         if (ret != 0) {
1502                 goto failed;
1503         }
1504
1505         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1506                                      &pnn_list);
1507         if (count <= 0) {
1508                 goto failed;
1509         }
1510
1511         for (i=0; i<count; i++) {
1512                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1513                                                pnn_list[i], TIMEOUT(), &ips);
1514                 if (ret != 0) {
1515                         goto failed;
1516                 }
1517
1518                 for (j=0; j<ips->num; j++) {
1519                         struct ctdb_public_ip ip;
1520
1521                         ip.pnn = ips->ip[j].pnn;
1522                         ip.addr = ips->ip[j].addr;
1523
1524                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1525                                           sizeof(ip.addr),
1526                                           (uint8_t *)&ip, sizeof(ip));
1527                         if (ret != 0) {
1528                                 goto failed;
1529                         }
1530                 }
1531
1532                 TALLOC_FREE(ips);
1533         }
1534
1535         talloc_free(pnn_list);
1536
1537         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1538         if (ret != 0) {
1539                 goto failed;
1540         }
1541
1542         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1543         if (ips == NULL) {
1544                 goto failed;
1545         }
1546
1547         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1548         if (ips->ip == NULL) {
1549                 goto failed;
1550         }
1551
1552         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         if (count != ips->num) {
1558                 goto failed;
1559         }
1560
1561         talloc_free(ipdb);
1562
1563         *out = ips;
1564         return 0;
1565
1566 failed:
1567         talloc_free(ipdb);
1568         return 1;
1569 }
1570
1571 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1572                       int argc, const char **argv)
1573 {
1574         struct ctdb_public_ip_list *ips;
1575         struct ctdb_public_ip_info **ipinfo;
1576         int ret, i;
1577         bool do_all = false;
1578
1579         if (argc > 1) {
1580                 usage("ip");
1581         }
1582
1583         if (argc == 1) {
1584                 if (strcmp(argv[0], "all") == 0) {
1585                         do_all = true;
1586                 } else {
1587                         usage("ip");
1588                 }
1589         }
1590
1591         if (do_all) {
1592                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1593         } else {
1594                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1595                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1596         }
1597         if (ret != 0) {
1598                 return ret;
1599         }
1600
1601         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1602         if (ipinfo == NULL) {
1603                 return 1;
1604         }
1605
1606         for (i=0; i<ips->num; i++) {
1607                 uint32_t pnn;
1608                 if (do_all) {
1609                         pnn = ips->ip[i].pnn;
1610                 } else {
1611                         pnn = ctdb->cmd_pnn;
1612                 }
1613                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1614                                                    ctdb->client, pnn,
1615                                                    TIMEOUT(), &ips->ip[i].addr,
1616                                                    &ipinfo[i]);
1617                 if (ret != 0) {
1618                         return ret;
1619                 }
1620         }
1621
1622         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1623         return 0;
1624 }
1625
1626 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1627                           int argc, const char **argv)
1628 {
1629         struct ctdb_public_ip_info *ipinfo;
1630         ctdb_sock_addr addr;
1631         int ret, i;
1632
1633         if (argc != 1) {
1634                 usage("ipinfo");
1635         }
1636
1637         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1638                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1639                 return 1;
1640         }
1641
1642         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1643                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1644                                            &ipinfo);
1645         if (ret != 0) {
1646                 if (ret == -1) {
1647                         printf("Node %u does not know about IP %s\n",
1648                                ctdb->cmd_pnn, argv[0]);
1649                 }
1650                 return ret;
1651         }
1652
1653         printf("Public IP[%s] info on node %u\n",
1654                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1655                                         ctdb->cmd_pnn);
1656
1657         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1658                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1659                ipinfo->ip.pnn, ipinfo->ifaces->num);
1660
1661         for (i=0; i<ipinfo->ifaces->num; i++) {
1662                 struct ctdb_iface *iface;
1663
1664                 iface = &ipinfo->ifaces->iface[i];
1665                 iface->name[CTDB_IFACE_SIZE] = '\0';
1666                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1667                        i+1, iface->name,
1668                        iface->link_state == 0 ? "down" : "up",
1669                        iface->references,
1670                        (i == ipinfo->active_idx) ? " (active)" : "");
1671         }
1672
1673         return 0;
1674 }
1675
1676 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1677                           int argc, const char **argv)
1678 {
1679         struct ctdb_iface_list *ifaces;
1680         int ret, i;
1681
1682         if (argc != 0) {
1683                 usage("ifaces");
1684         }
1685
1686         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1687                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1688         if (ret != 0) {
1689                 return ret;
1690         }
1691
1692         if (ifaces->num == 0) {
1693                 printf("No interfaces configured on node %u\n",
1694                        ctdb->cmd_pnn);
1695                 return 0;
1696         }
1697
1698         if (options.machinereadable) {
1699                 printf("%s%s%s%s%s%s%s\n", options.sep,
1700                        "Name", options.sep,
1701                        "LinkStatus", options.sep,
1702                        "References", options.sep);
1703         } else {
1704                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1705         }
1706
1707         for (i=0; i<ifaces->num; i++) {
1708                 if (options.machinereadable) {
1709                         printf("%s%s%s%u%s%u%s\n", options.sep,
1710                                ifaces->iface[i].name, options.sep,
1711                                ifaces->iface[i].link_state, options.sep,
1712                                ifaces->iface[i].references, options.sep);
1713                 } else {
1714                         printf("name:%s link:%s references:%u\n",
1715                                ifaces->iface[i].name,
1716                                ifaces->iface[i].link_state ? "up" : "down",
1717                                ifaces->iface[i].references);
1718                 }
1719         }
1720
1721         return 0;
1722 }
1723
1724 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1725                                 int argc, const char **argv)
1726 {
1727         struct ctdb_iface_list *ifaces;
1728         struct ctdb_iface *iface;
1729         int ret, i;
1730
1731         if (argc != 2) {
1732                 usage("setifacelink");
1733         }
1734
1735         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1736                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1737                 return 1;
1738         }
1739
1740         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1741                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1742         if (ret != 0) {
1743                 fprintf(stderr,
1744                         "Failed to get interface information from node %u\n",
1745                         ctdb->cmd_pnn);
1746                 return ret;
1747         }
1748
1749         iface = NULL;
1750         for (i=0; i<ifaces->num; i++) {
1751                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1752                         iface = &ifaces->iface[i];
1753                         break;
1754                 }
1755         }
1756
1757         if (iface == NULL) {
1758                 printf("Interface %s not configured on node %u\n",
1759                        argv[0], ctdb->cmd_pnn);
1760                 return 1;
1761         }
1762
1763         if (strcmp(argv[1], "up") == 0) {
1764                 iface->link_state = 1;
1765         } else if (strcmp(argv[1], "down") == 0) {
1766                 iface->link_state = 0;
1767         } else {
1768                 usage("setifacelink");
1769                 return 1;
1770         }
1771
1772         iface->references = 0;
1773
1774         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1775                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1776         if (ret != 0) {
1777                 return ret;
1778         }
1779
1780         return 0;
1781 }
1782
1783 static int control_process_exists(TALLOC_CTX *mem_ctx,
1784                                   struct ctdb_context *ctdb,
1785                                   int argc, const char **argv)
1786 {
1787         pid_t pid;
1788         int ret, status;
1789
1790         if (argc != 1) {
1791                 usage("process-exists");
1792         }
1793
1794         pid = atoi(argv[0]);
1795         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1796                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1797         if (ret != 0) {
1798                 return ret;
1799         }
1800
1801         if (status == 0) {
1802                 printf("PID %u exists\n", pid);
1803         } else {
1804                 printf("PID %u does not exist\n", pid);
1805         }
1806         return status;
1807 }
1808
1809 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                             int argc, const char **argv)
1811 {
1812         struct ctdb_dbid_map *dbmap;
1813         int ret, i;
1814
1815         if (argc != 0) {
1816                 usage("getdbmap");
1817         }
1818
1819         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1820                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1821         if (ret != 0) {
1822                 return ret;
1823         }
1824
1825         if (options.machinereadable == 1) {
1826                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1827                        options.sep,
1828                        "ID", options.sep,
1829                        "Name", options.sep,
1830                        "Path", options.sep,
1831                        "Persistent", options.sep,
1832                        "Sticky", options.sep,
1833                        "Unhealthy", options.sep,
1834                        "Readonly", options.sep);
1835         } else {
1836                 printf("Number of databases:%d\n", dbmap->num);
1837         }
1838
1839         for (i=0; i<dbmap->num; i++) {
1840                 const char *name;
1841                 const char *path;
1842                 const char *health;
1843                 bool persistent;
1844                 bool readonly;
1845                 bool sticky;
1846                 uint32_t db_id;
1847
1848                 db_id = dbmap->dbs[i].db_id;
1849
1850                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1851                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1852                                            &name);
1853                 if (ret != 0) {
1854                         return ret;
1855                 }
1856
1857                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1858                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1859                                           &path);
1860                 if (ret != 0) {
1861                         return ret;
1862                 }
1863
1864                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1865                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1866                                               &health);
1867                 if (ret != 0) {
1868                         return ret;
1869                 }
1870
1871                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1872                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1873                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1874
1875                 if (options.machinereadable == 1) {
1876                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1877                                options.sep,
1878                                db_id, options.sep,
1879                                name, options.sep,
1880                                path, options.sep,
1881                                !! (persistent), options.sep,
1882                                !! (sticky), options.sep,
1883                                !! (health), options.sep,
1884                                !! (readonly), options.sep);
1885                 } else {
1886                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1887                                db_id, name, path,
1888                                persistent ? " PERSISTENT" : "",
1889                                sticky ? " STICKY" : "",
1890                                readonly ? " READONLY" : "",
1891                                health ? " UNHEALTHY" : "");
1892                 }
1893
1894                 talloc_free(discard_const(name));
1895                 talloc_free(discard_const(path));
1896                 talloc_free(discard_const(health));
1897         }
1898
1899         return 0;
1900 }
1901
1902 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1903                                int argc, const char **argv)
1904 {
1905         uint32_t db_id;
1906         const char *db_name, *db_path, *db_health;
1907         uint8_t db_flags;
1908         int ret;
1909
1910         if (argc != 1) {
1911                 usage("getdbstatus");
1912         }
1913
1914         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1915                 return 1;
1916         }
1917
1918         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1919                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1920                                   &db_path);
1921         if (ret != 0) {
1922                 return ret;
1923         }
1924
1925         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1926                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1927                                       &db_health);
1928         if (ret != 0) {
1929                 return ret;
1930         }
1931
1932         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1933         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1934                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1935                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1936                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1937                (db_health ? db_health : "OK"));
1938         return 0;
1939 }
1940
1941 struct dump_record_state {
1942         uint32_t count;
1943 };
1944
1945 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1946
1947 static void dump_tdb_data(const char *name, TDB_DATA val)
1948 {
1949         int i;
1950
1951         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1952         for (i=0; i<val.dsize; i++) {
1953                 if (ISASCII(val.dptr[i])) {
1954                         fprintf(stdout, "%c", val.dptr[i]);
1955                 } else {
1956                         fprintf(stdout, "\\%02X", val.dptr[i]);
1957                 }
1958         }
1959         fprintf(stdout, "\"\n");
1960 }
1961
1962 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1963 {
1964         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1965         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1966         fprintf(stdout, "flags: 0x%08x", header->flags);
1967         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1968                 fprintf(stdout, " MIGRATED_WITH_DATA");
1969         }
1970         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1971                 fprintf(stdout, " VACUUM_MIGRATED");
1972         }
1973         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1974                 fprintf(stdout, " AUTOMATIC");
1975         }
1976         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1977                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1978         }
1979         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
1980                 fprintf(stdout, " RO_HAVE_READONLY");
1981         }
1982         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
1983                 fprintf(stdout, " RO_REVOKING_READONLY");
1984         }
1985         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
1986                 fprintf(stdout, " RO_REVOKE_COMPLETE");
1987         }
1988         fprintf(stdout, "\n");
1989
1990 }
1991
1992 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
1993                        TDB_DATA key, TDB_DATA data, void *private_data)
1994 {
1995         struct dump_record_state *state =
1996                 (struct dump_record_state *)private_data;
1997
1998         state->count += 1;
1999
2000         dump_tdb_data("key", key);
2001         dump_ltdb_header(header);
2002         dump_tdb_data("data", data);
2003         fprintf(stdout, "\n");
2004
2005         return 0;
2006 }
2007
2008 struct traverse_state {
2009         TALLOC_CTX *mem_ctx;
2010         bool done;
2011         ctdb_rec_parser_func_t func;
2012         struct dump_record_state sub_state;
2013 };
2014
2015 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2016 {
2017         struct traverse_state *state = (struct traverse_state *)private_data;
2018         struct ctdb_rec_data *rec;
2019         struct ctdb_ltdb_header header;
2020         int ret;
2021
2022         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2023         if (ret != 0) {
2024                 return;
2025         }
2026
2027         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2028                 talloc_free(rec);
2029                 /* end of traverse */
2030                 state->done = true;
2031                 return;
2032         }
2033
2034         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2035         if (ret != 0) {
2036                 talloc_free(rec);
2037                 return;
2038         }
2039
2040         if (rec->data.dsize == 0) {
2041                 return;
2042         }
2043
2044         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2045                           &state->sub_state);
2046         talloc_free(rec);
2047         if (ret != 0) {
2048                 state->done = true;
2049         }
2050 }
2051
2052 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2053                          int argc, const char **argv)
2054 {
2055         struct ctdb_db_context *db;
2056         const char *db_name;
2057         uint32_t db_id;
2058         uint8_t db_flags;
2059         struct ctdb_traverse_start_ext traverse;
2060         struct traverse_state state;
2061         int ret;
2062
2063         if (argc != 1) {
2064                 usage("catdb");
2065         }
2066
2067         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2068                 return 1;
2069         }
2070
2071         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2072                           db_flags, &db);
2073         if (ret != 0) {
2074                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2075                 return ret;
2076         }
2077
2078         /* Valgrind fix */
2079         ZERO_STRUCT(traverse);
2080
2081         traverse.db_id = db_id;
2082         traverse.reqid = 0;
2083         traverse.srvid = next_srvid(ctdb);
2084         traverse.withemptyrecords = false;
2085
2086         state.mem_ctx = mem_ctx;
2087         state.done = false;
2088         state.func = dump_record;
2089         state.sub_state.count = 0;
2090
2091         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2092                                               traverse.srvid,
2093                                               traverse_handler, &state);
2094         if (ret != 0) {
2095                 return ret;
2096         }
2097
2098         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2099                                            ctdb->cmd_pnn, TIMEOUT(),
2100                                            &traverse);
2101         if (ret != 0) {
2102                 return ret;
2103         }
2104
2105         ctdb_client_wait(ctdb->ev, &state.done);
2106
2107         printf("Dumped %u records\n", state.sub_state.count);
2108
2109         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2110                                                  traverse.srvid, &state);
2111         if (ret != 0) {
2112                 return ret;
2113         }
2114
2115         return 0;
2116 }
2117
2118 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2119                           int argc, const char **argv)
2120 {
2121         struct ctdb_db_context *db;
2122         const char *db_name;
2123         uint32_t db_id;
2124         uint8_t db_flags;
2125         struct dump_record_state state;
2126         int ret;
2127
2128         if (argc != 1) {
2129                 usage("catdb");
2130         }
2131
2132         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2133                 return 1;
2134         }
2135
2136         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2137                           db_flags, &db);
2138         if (ret != 0) {
2139                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2140                 return ret;
2141         }
2142
2143         state.count = 0;
2144         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2145
2146         printf("Dumped %u record(s)\n", state.count);
2147
2148         return ret;
2149 }
2150
2151 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2152                               int argc, const char **argv)
2153 {
2154         int mode, ret;
2155
2156         if (argc != 0) {
2157                 usage("getmonmode");
2158         }
2159
2160         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2161                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2162         if (ret != 0) {
2163                 return ret;
2164         }
2165
2166         printf("%s\n",
2167                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2168         return 0;
2169 }
2170
2171 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2172                                    struct ctdb_context *ctdb,
2173                                    int argc, const char **argv)
2174 {
2175         uint32_t caps;
2176         int ret;
2177
2178         if (argc != 0) {
2179                 usage("getcapabilities");
2180         }
2181
2182         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2183                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2184         if (ret != 0) {
2185                 return ret;
2186         }
2187
2188         if (options.machinereadable == 1) {
2189                 printf("%s%s%s%s%s\n",
2190                        options.sep,
2191                        "RECMASTER", options.sep,
2192                        "LMASTER", options.sep);
2193                 printf("%s%d%s%d%s\n", options.sep,
2194                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2195                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2196         } else {
2197                 printf("RECMASTER: %s\n",
2198                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2199                 printf("LMASTER: %s\n",
2200                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2201         }
2202
2203         return 0;
2204 }
2205
2206 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2207                        int argc, const char **argv)
2208 {
2209         printf("%u\n", ctdb_client_pnn(ctdb->client));
2210         return 0;
2211 }
2212
2213 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2214                        int argc, const char **argv)
2215 {
2216         char *t, *lvs_helper = NULL;
2217
2218         if (argc != 1) {
2219                 usage("lvs");
2220         }
2221
2222         t = getenv("CTDB_LVS_HELPER");
2223         if (t != NULL) {
2224                 lvs_helper = talloc_strdup(mem_ctx, t);
2225         } else {
2226                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2227                                              CTDB_HELPER_BINDIR);
2228         }
2229
2230         if (lvs_helper == NULL) {
2231                 fprintf(stderr, "Unable to set LVS helper\n");
2232                 return 1;
2233         }
2234
2235         return run_helper("LVS helper", lvs_helper, argv[0]);
2236 }
2237
2238 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2239                                    struct ctdb_context *ctdb,
2240                                    int argc, const char **argv)
2241 {
2242         int ret;
2243
2244         if (argc != 0) {
2245                 usage("disablemonitor");
2246         }
2247
2248         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2249                                         ctdb->cmd_pnn, TIMEOUT());
2250         if (ret != 0) {
2251                 return ret;
2252         }
2253
2254         return 0;
2255 }
2256
2257 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2258                                   struct ctdb_context *ctdb,
2259                                   int argc, const char **argv)
2260 {
2261         int ret;
2262
2263         if (argc != 0) {
2264                 usage("enablemonitor");
2265         }
2266
2267         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2268                                        ctdb->cmd_pnn, TIMEOUT());
2269         if (ret != 0) {
2270                 return ret;
2271         }
2272
2273         return 0;
2274 }
2275
2276 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2277                             int argc, const char **argv)
2278 {
2279         enum debug_level log_level;
2280         int ret;
2281         bool found;
2282
2283         if (argc != 1) {
2284                 usage("setdebug");
2285         }
2286
2287         found = debug_level_parse(argv[0], &log_level);
2288         if (! found) {
2289                 fprintf(stderr,
2290                         "Invalid debug level '%s'. Valid levels are:\n",
2291                         argv[0]);
2292                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2293                 return 1;
2294         }
2295
2296         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2297                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2298         if (ret != 0) {
2299                 return ret;
2300         }
2301
2302         return 0;
2303 }
2304
2305 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2306                             int argc, const char **argv)
2307 {
2308         enum debug_level loglevel;
2309         const char *log_str;
2310         int ret;
2311
2312         if (argc != 0) {
2313                 usage("getdebug");
2314         }
2315
2316         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2317                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2318         if (ret != 0) {
2319                 return ret;
2320         }
2321
2322         log_str = debug_level_to_string(loglevel);
2323         printf("%s\n", log_str);
2324
2325         return 0;
2326 }
2327
2328 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2329                           int argc, const char **argv)
2330 {
2331         const char *db_name;
2332         uint8_t db_flags = 0;
2333         int ret;
2334
2335         if (argc < 1 || argc > 2) {
2336                 usage("attach");
2337         }
2338
2339         db_name = argv[0];
2340         if (argc == 2) {
2341                 if (strcmp(argv[1], "persistent") == 0) {
2342                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2343                 } else if (strcmp(argv[1], "readonly") == 0) {
2344                         db_flags = CTDB_DB_FLAGS_READONLY;
2345                 } else if (strcmp(argv[1], "sticky") == 0) {
2346                         db_flags = CTDB_DB_FLAGS_STICKY;
2347                 } else {
2348                         usage("attach");
2349                 }
2350         }
2351
2352         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2353                           db_flags, NULL);
2354         if (ret != 0) {
2355                 return ret;
2356         }
2357
2358         return 0;
2359 }
2360
2361 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2362                           int argc, const char **argv)
2363 {
2364         const char *db_name;
2365         uint32_t db_id;
2366         uint8_t db_flags;
2367         struct ctdb_node_map *nodemap;
2368         int recmode;
2369         int ret, ret2, i;
2370
2371         if (argc < 1) {
2372                 usage("detach");
2373         }
2374
2375         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2376                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2377         if (ret != 0) {
2378                 return ret;
2379         }
2380
2381         if (recmode == CTDB_RECOVERY_ACTIVE) {
2382                 fprintf(stderr, "Database cannot be detached"
2383                                 " when recovery is active\n");
2384                 return 1;
2385         }
2386
2387         nodemap = get_nodemap(ctdb, false);
2388         if (nodemap == NULL) {
2389                 return 1;
2390         }
2391
2392         for (i=0; i<nodemap->num; i++) {
2393                 uint32_t value;
2394
2395                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2396                         continue;
2397                 }
2398                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2399                         continue;
2400                 }
2401                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2402                         fprintf(stderr, "Database cannot be detached on"
2403                                 " inactive (stopped or banned) node %u\n",
2404                                 nodemap->node[i].pnn);
2405                         return 1;
2406                 }
2407
2408                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2409                                             nodemap->node[i].pnn, TIMEOUT(),
2410                                             "AllowClientDBAttach", &value);
2411                 if (ret != 0) {
2412                         fprintf(stderr,
2413                                 "Unable to get tunable AllowClientDBAttach"
2414                                 " from node %u\n", nodemap->node[i].pnn);
2415                         return ret;
2416                 }
2417
2418                 if (value == 1) {
2419                         fprintf(stderr,
2420                                 "Database access is still active on node %u."
2421                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2422                                 nodemap->node[i].pnn);
2423                         return 1;
2424                 }
2425         }
2426
2427         ret2 = 0;
2428         for (i=0; i<argc; i++) {
2429                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2430                                 &db_flags)) {
2431                         continue;
2432                 }
2433
2434                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2435                         fprintf(stderr,
2436                                 "Persistent database %s cannot be detached\n",
2437                                 argv[0]);
2438                         return 1;
2439                 }
2440
2441                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2442                                   TIMEOUT(), db_id);
2443                 if (ret != 0) {
2444                         fprintf(stderr, "Database %s detach failed\n", db_name);
2445                         ret2 = ret;
2446                 }
2447         }
2448
2449         return ret2;
2450 }
2451
2452 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2453                               int argc, const char **argv)
2454 {
2455         const char *mem_str;
2456         ssize_t n;
2457         int ret;
2458
2459         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2460                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2461         if (ret != 0) {
2462                 return ret;
2463         }
2464
2465         n = write(1, mem_str, strlen(mem_str)+1);
2466         if (n < 0 || n != strlen(mem_str)+1) {
2467                 fprintf(stderr, "Failed to write talloc summary\n");
2468                 return 1;
2469         }
2470
2471         return 0;
2472 }
2473
2474 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2475 {
2476         bool *done = (bool *)private_data;
2477         ssize_t n;
2478
2479         n = write(1, data.dptr, data.dsize);
2480         if (n < 0 || n != data.dsize) {
2481                 fprintf(stderr, "Failed to write talloc summary\n");
2482         }
2483
2484         *done = true;
2485 }
2486
2487 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2488                                 int argc, const char **argv)
2489 {
2490         struct ctdb_srvid_message msg = { 0 };
2491         int ret;
2492         bool done = false;
2493
2494         msg.pnn = ctdb->pnn;
2495         msg.srvid = next_srvid(ctdb);
2496
2497         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2498                                               msg.srvid, dump_memory, &done);
2499         if (ret != 0) {
2500                 return ret;
2501         }
2502
2503         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2504                                     ctdb->cmd_pnn, &msg);
2505         if (ret != 0) {
2506                 return ret;
2507         }
2508
2509         ctdb_client_wait(ctdb->ev, &done);
2510         return 0;
2511 }
2512
2513 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2514                           int argc, const char **argv)
2515 {
2516         pid_t pid;
2517         int ret;
2518
2519         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2520                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2521         if (ret != 0) {
2522                 return ret;
2523         }
2524
2525         printf("%u\n", pid);
2526         return 0;
2527 }
2528
2529 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2530                        const char *desc, uint32_t flag, bool set_flag)
2531 {
2532         struct ctdb_node_map *nodemap;
2533         bool flag_is_set;
2534
2535         nodemap = get_nodemap(ctdb, false);
2536         if (nodemap == NULL) {
2537                 return 1;
2538         }
2539
2540         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2541         if (set_flag == flag_is_set) {
2542                 if (set_flag) {
2543                         fprintf(stderr, "Node %u is already %s\n",
2544                                 ctdb->cmd_pnn, desc);
2545                 } else {
2546                         fprintf(stderr, "Node %u is not %s\n",
2547                                 ctdb->cmd_pnn, desc);
2548                 }
2549                 return 0;
2550         }
2551
2552         return 1;
2553 }
2554
2555 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2556                            uint32_t flag, bool set_flag)
2557 {
2558         struct ctdb_node_map *nodemap;
2559         bool flag_is_set;
2560
2561         while (1) {
2562                 nodemap = get_nodemap(ctdb, true);
2563                 if (nodemap == NULL) {
2564                         fprintf(stderr,
2565                                 "Failed to get nodemap, trying again\n");
2566                         sleep(1);
2567                         continue;
2568                 }
2569
2570                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2571                 if (flag_is_set == set_flag) {
2572                         break;
2573                 }
2574
2575                 sleep(1);
2576         }
2577 }
2578
2579 struct ipreallocate_state {
2580         int status;
2581         bool done;
2582 };
2583
2584 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2585                                  void *private_data)
2586 {
2587         struct ipreallocate_state *state =
2588                 (struct ipreallocate_state *)private_data;
2589
2590         if (data.dsize != sizeof(int)) {
2591                 /* Ignore packet */
2592                 return;
2593         }
2594
2595         state->status = *(int *)data.dptr;
2596         state->done = true;
2597 }
2598
2599 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2600 {
2601         struct ctdb_srvid_message msg = { 0 };
2602         struct ipreallocate_state state;
2603         int ret;
2604
2605         msg.pnn = ctdb->pnn;
2606         msg.srvid = next_srvid(ctdb);
2607
2608         state.done = false;
2609         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2610                                               msg.srvid,
2611                                               ipreallocate_handler, &state);
2612         if (ret != 0) {
2613                 return ret;
2614         }
2615
2616         while (true) {
2617                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2618                                                 ctdb->client,
2619                                                 CTDB_BROADCAST_CONNECTED,
2620                                                 &msg);
2621                 if (ret != 0) {
2622                         goto fail;
2623                 }
2624
2625                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2626                                                TIMEOUT());
2627                 if (ret != 0) {
2628                         continue;
2629                 }
2630
2631                 if (state.status >= 0) {
2632                         ret = 0;
2633                 } else {
2634                         ret = state.status;
2635                 }
2636                 break;
2637         }
2638
2639 fail:
2640         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2641                                            msg.srvid, &state);
2642         return ret;
2643 }
2644
2645 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2646                            int argc, const char **argv)
2647 {
2648         int ret;
2649
2650         if (argc != 0) {
2651                 usage("disable");
2652         }
2653
2654         ret = check_flags(mem_ctx, ctdb, "disabled",
2655                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2656         if (ret == 0) {
2657                 return 0;
2658         }
2659
2660         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2661                                  ctdb->cmd_pnn, TIMEOUT(),
2662                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2663         if (ret != 0) {
2664                 fprintf(stderr,
2665                         "Failed to set DISABLED flag on node %u\n",
2666                         ctdb->cmd_pnn);
2667                 return ret;
2668         }
2669
2670         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2671         return ipreallocate(mem_ctx, ctdb);
2672 }
2673
2674 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2675                           int argc, const char **argv)
2676 {
2677         int ret;
2678
2679         if (argc != 0) {
2680                 usage("enable");
2681         }
2682
2683         ret = check_flags(mem_ctx, ctdb, "disabled",
2684                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2685         if (ret == 0) {
2686                 return 0;
2687         }
2688
2689         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2690                                  ctdb->cmd_pnn, TIMEOUT(),
2691                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2692         if (ret != 0) {
2693                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2694                         ctdb->cmd_pnn);
2695                 return ret;
2696         }
2697
2698         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2699         return ipreallocate(mem_ctx, ctdb);
2700 }
2701
2702 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2703                         int argc, const char **argv)
2704 {
2705         int ret;
2706
2707         if (argc != 0) {
2708                 usage("stop");
2709         }
2710
2711         ret = check_flags(mem_ctx, ctdb, "stopped",
2712                           NODE_FLAGS_STOPPED, true);
2713         if (ret == 0) {
2714                 return 0;
2715         }
2716
2717         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2718                                   ctdb->cmd_pnn, TIMEOUT());
2719         if (ret != 0) {
2720                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2721                 return ret;
2722         }
2723
2724         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2725         return ipreallocate(mem_ctx, ctdb);
2726 }
2727
2728 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2729                             int argc, const char **argv)
2730 {
2731         int ret;
2732
2733         if (argc != 0) {
2734                 usage("continue");
2735         }
2736
2737         ret = check_flags(mem_ctx, ctdb, "stopped",
2738                           NODE_FLAGS_STOPPED, false);
2739         if (ret == 0) {
2740                 return 0;
2741         }
2742
2743         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2744                                       ctdb->cmd_pnn, TIMEOUT());
2745         if (ret != 0) {
2746                 fprintf(stderr, "Failed to continue stopped node %u\n",
2747                         ctdb->cmd_pnn);
2748                 return ret;
2749         }
2750
2751         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2752         return ipreallocate(mem_ctx, ctdb);
2753 }
2754
2755 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2756                        int argc, const char **argv)
2757 {
2758         struct ctdb_ban_state ban_state;
2759         int ret;
2760
2761         if (argc != 1) {
2762                 usage("ban");
2763         }
2764
2765         ret = check_flags(mem_ctx, ctdb, "banned",
2766                           NODE_FLAGS_BANNED, true);
2767         if (ret == 0) {
2768                 return 0;
2769         }
2770
2771         ban_state.pnn = ctdb->cmd_pnn;
2772         ban_state.time = strtoul(argv[0], NULL, 0);
2773
2774         if (ban_state.time == 0) {
2775                 fprintf(stderr, "Ban time cannot be zero\n");
2776                 return EINVAL;
2777         }
2778
2779         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2780                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2781         if (ret != 0) {
2782                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2783                 return ret;
2784         }
2785
2786         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2787         return ipreallocate(mem_ctx, ctdb);
2788
2789 }
2790
2791 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2792                          int argc, const char **argv)
2793 {
2794         struct ctdb_ban_state ban_state;
2795         int ret;
2796
2797         if (argc != 0) {
2798                 usage("unban");
2799         }
2800
2801         ret = check_flags(mem_ctx, ctdb, "banned",
2802                           NODE_FLAGS_BANNED, false);
2803         if (ret == 0) {
2804                 return 0;
2805         }
2806
2807         ban_state.pnn = ctdb->cmd_pnn;
2808         ban_state.time = 0;
2809
2810         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2811                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2812         if (ret != 0) {
2813                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2814                 return ret;
2815         }
2816
2817         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2818         return ipreallocate(mem_ctx, ctdb);
2819
2820 }
2821
2822 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2823                             int argc, const char **argv)
2824 {
2825         int ret;
2826
2827         if (argc != 0) {
2828                 usage("shutdown");
2829         }
2830
2831         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2832                                  ctdb->cmd_pnn, TIMEOUT());
2833         if (ret != 0) {
2834                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2835                 return ret;
2836         }
2837
2838         return 0;
2839 }
2840
2841 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2842                           uint32_t *generation)
2843 {
2844         uint32_t recmaster;
2845         int recmode;
2846         struct ctdb_vnn_map *vnnmap;
2847         int ret;
2848
2849 again:
2850         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2851                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2852         if (ret != 0) {
2853                 fprintf(stderr, "Failed to find recovery master\n");
2854                 return ret;
2855         }
2856
2857         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2858                                     recmaster, TIMEOUT(), &recmode);
2859         if (ret != 0) {
2860                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2861                         recmaster);
2862                 return ret;
2863         }
2864
2865         if (recmode == CTDB_RECOVERY_ACTIVE) {
2866                 sleep(1);
2867                 goto again;
2868         }
2869
2870         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2871                                   recmaster, TIMEOUT(), &vnnmap);
2872         if (ret != 0) {
2873                 fprintf(stderr, "Failed to get generation from node %u\n",
2874                         recmaster);
2875                 return ret;
2876         }
2877
2878         if (vnnmap->generation == 1) {
2879                 talloc_free(vnnmap);
2880                 sleep(1);
2881                 goto again;
2882         }
2883
2884         *generation = vnnmap->generation;
2885         talloc_free(vnnmap);
2886         return 0;
2887 }
2888
2889
2890 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2891                            int argc, const char **argv)
2892 {
2893         uint32_t generation, next_generation;
2894         int ret;
2895
2896         if (argc != 0) {
2897                 usage("recover");
2898         }
2899
2900         ret = get_generation(mem_ctx, ctdb, &generation);
2901         if (ret != 0) {
2902                 return ret;
2903         }
2904
2905         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2906                                     ctdb->cmd_pnn, TIMEOUT(),
2907                                     CTDB_RECOVERY_ACTIVE);
2908         if (ret != 0) {
2909                 fprintf(stderr, "Failed to set recovery mode active\n");
2910                 return ret;
2911         }
2912
2913         while (1) {
2914                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2915                 if (ret != 0) {
2916                         fprintf(stderr,
2917                                 "Failed to confirm end of recovery\n");
2918                         return ret;
2919                 }
2920
2921                 if (next_generation != generation) {
2922                         break;
2923                 }
2924
2925                 sleep (1);
2926         }
2927
2928         return 0;
2929 }
2930
2931 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2932                                 int argc, const char **argv)
2933 {
2934         if (argc != 0) {
2935                 usage("ipreallocate");
2936         }
2937
2938         return ipreallocate(mem_ctx, ctdb);
2939 }
2940
2941 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2942                                   struct ctdb_context *ctdb,
2943                                   int argc, const char **argv)
2944 {
2945         uint32_t recmaster;
2946         int ret;
2947
2948         if (argc != 0) {
2949                 usage("isnotrecmaster");
2950         }
2951
2952         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2953                                       ctdb->pnn, TIMEOUT(), &recmaster);
2954         if (ret != 0) {
2955                 fprintf(stderr, "Failed to get recmaster\n");
2956                 return ret;
2957         }
2958
2959         if (recmaster != ctdb->pnn) {
2960                 printf("this node is not the recmaster\n");
2961                 return 1;
2962         }
2963
2964         printf("this node is the recmaster\n");
2965         return 0;
2966 }
2967
2968 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2969                            int argc, const char **argv)
2970 {
2971         struct ctdb_addr_info addr_info;
2972         int ret;
2973
2974         if (argc != 2) {
2975                 usage("gratarp");
2976         }
2977
2978         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2979                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2980                 return 1;
2981         }
2982         addr_info.iface = argv[1];
2983
2984         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2985                                             ctdb->cmd_pnn, TIMEOUT(),
2986                                             &addr_info);
2987         if (ret != 0) {
2988                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2989                         ctdb->cmd_pnn);
2990                 return ret;
2991         }
2992
2993         return 0;
2994 }
2995
2996 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2997                            int argc, const char **argv)
2998 {
2999         ctdb_sock_addr src, dst;
3000         int ret;
3001
3002         if (argc != 0 && argc != 2) {
3003                 usage("tickle");
3004         }
3005
3006         if (argc == 0) {
3007                 struct ctdb_connection *clist;
3008                 int count;
3009                 int i, num_failed;
3010
3011                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3012                 if (ret != 0) {
3013                         return ret;
3014                 }
3015
3016                 num_failed = 0;
3017                 for (i=0; i<count; i++) {
3018                         ret = ctdb_sys_send_tcp(&clist[i].src,
3019                                                 &clist[i].dst,
3020                                                 0, 0, 0);
3021                         if (ret != 0) {
3022                                 num_failed += 1;
3023                         }
3024                 }
3025
3026                 if (num_failed > 0) {
3027                         fprintf(stderr, "Failed to send %d tickles\n",
3028                                 num_failed);
3029                         return 1;
3030                 }
3031
3032                 return 0;
3033         }
3034
3035
3036         if (! parse_ip_port(argv[0], &src)) {
3037                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3038                 return 1;
3039         }
3040
3041         if (! parse_ip_port(argv[1], &dst)) {
3042                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3043                 return 1;
3044         }
3045
3046         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3047         if (ret != 0) {
3048                 fprintf(stderr, "Failed to send tickle ack\n");
3049                 return ret;
3050         }
3051
3052         return 0;
3053 }
3054
3055 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3056                               int argc, const char **argv)
3057 {
3058         ctdb_sock_addr addr;
3059         struct ctdb_tickle_list *tickles;
3060         unsigned port = 0;
3061         int ret, i;
3062
3063         if (argc < 1 || argc > 2) {
3064                 usage("gettickles");
3065         }
3066
3067         if (argc == 2) {
3068                 port = strtoul(argv[1], NULL, 10);
3069         }
3070
3071         if (! parse_ip(argv[0], NULL, port, &addr)) {
3072                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3073                 return 1;
3074         }
3075
3076         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3077                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3078                                             &tickles);
3079         if (ret != 0) {
3080                 fprintf(stderr, "Failed to get list of connections\n");
3081                 return ret;
3082         }
3083
3084         if (options.machinereadable) {
3085                 printf("%s%s%s%s%s%s%s%s%s\n",
3086                        options.sep,
3087                        "Source IP", options.sep,
3088                        "Port", options.sep,
3089                        "Destiation IP", options.sep,
3090                        "Port", options.sep);
3091                 for (i=0; i<tickles->num; i++) {
3092                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3093                                ctdb_sock_addr_to_string(
3094                                        mem_ctx, &tickles->conn[i].src),
3095                                options.sep,
3096                                ntohs(tickles->conn[i].src.ip.sin_port),
3097                                options.sep,
3098                                ctdb_sock_addr_to_string(
3099                                        mem_ctx, &tickles->conn[i].dst),
3100                                options.sep,
3101                                ntohs(tickles->conn[i].dst.ip.sin_port),
3102                                options.sep);
3103                 }
3104         } else {
3105                 printf("Connections for IP: %s\n",
3106                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3107                 printf("Num connections: %u\n", tickles->num);
3108                 for (i=0; i<tickles->num; i++) {
3109                         printf("SRC: %s:%u   DST: %s:%u\n",
3110                                ctdb_sock_addr_to_string(
3111                                        mem_ctx, &tickles->conn[i].src),
3112                                ntohs(tickles->conn[i].src.ip.sin_port),
3113                                ctdb_sock_addr_to_string(
3114                                        mem_ctx, &tickles->conn[i].dst),
3115                                ntohs(tickles->conn[i].dst.ip.sin_port));
3116                 }
3117         }
3118
3119         return 0;
3120 }
3121
3122 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3123                                    struct ctdb_connection *conn);
3124
3125 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3126
3127 struct process_clist_state {
3128         struct ctdb_connection *clist;
3129         int count;
3130         int num_failed, num_total;
3131         clist_reply_func reply_func;
3132 };
3133
3134 static void process_clist_done(struct tevent_req *subreq);
3135
3136 static struct tevent_req *process_clist_send(
3137                                         TALLOC_CTX *mem_ctx,
3138                                         struct ctdb_context *ctdb,
3139                                         struct ctdb_connection *clist,
3140                                         int count,
3141                                         clist_request_func request_func,
3142                                         clist_reply_func reply_func)
3143 {
3144         struct tevent_req *req, *subreq;
3145         struct process_clist_state *state;
3146         struct ctdb_req_control request;
3147         int i;
3148
3149         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3150         if (req == NULL) {
3151                 return NULL;
3152         }
3153
3154         state->clist = clist;
3155         state->count = count;
3156         state->reply_func = reply_func;
3157
3158         for (i=0; i<count; i++) {
3159                 request_func(&request, &clist[i]);
3160                 subreq = ctdb_client_control_send(state, ctdb->ev,
3161                                                   ctdb->client, ctdb->cmd_pnn,
3162                                                   TIMEOUT(), &request);
3163                 if (tevent_req_nomem(subreq, req)) {
3164                         return tevent_req_post(req, ctdb->ev);
3165                 }
3166                 tevent_req_set_callback(subreq, process_clist_done, req);
3167         }
3168
3169         return req;
3170 }
3171
3172 static void process_clist_done(struct tevent_req *subreq)
3173 {
3174         struct tevent_req *req = tevent_req_callback_data(
3175                 subreq, struct tevent_req);
3176         struct process_clist_state *state = tevent_req_data(
3177                 req, struct process_clist_state);
3178         struct ctdb_reply_control *reply;
3179         int ret;
3180         bool status;
3181
3182         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3183         TALLOC_FREE(subreq);
3184         if (! status) {
3185                 state->num_failed += 1;
3186                 goto done;
3187         }
3188
3189         ret = state->reply_func(reply);
3190         if (ret != 0) {
3191                 state->num_failed += 1;
3192                 goto done;
3193         }
3194
3195 done:
3196         state->num_total += 1;
3197         if (state->num_total == state->count) {
3198                 tevent_req_done(req);
3199         }
3200 }
3201
3202 static int process_clist_recv(struct tevent_req *req)
3203 {
3204         struct process_clist_state *state = tevent_req_data(
3205                 req, struct process_clist_state);
3206
3207         return state->num_failed;
3208 }
3209
3210 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3211                              int argc, const char **argv)
3212 {
3213         struct ctdb_connection conn;
3214         int ret;
3215
3216         if (argc != 0 && argc != 2) {
3217                 usage("addtickle");
3218         }
3219
3220         if (argc == 0) {
3221                 struct ctdb_connection *clist;
3222                 struct tevent_req *req;
3223                 int count;
3224
3225                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3226                 if (ret != 0) {
3227                         return ret;
3228                 }
3229
3230                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3231                                  ctdb_req_control_tcp_add_delayed_update,
3232                                  ctdb_reply_control_tcp_add_delayed_update);
3233                 if (req == NULL) {
3234                         return ENOMEM;
3235                 }
3236
3237                 tevent_req_poll(req, ctdb->ev);
3238
3239                 ret = process_clist_recv(req);
3240                 if (ret != 0) {
3241                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3242                         return 1;
3243                 }
3244
3245                 return 0;
3246         }
3247
3248         if (! parse_ip_port(argv[0], &conn.src)) {
3249                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3250                 return 1;
3251         }
3252         if (! parse_ip_port(argv[1], &conn.dst)) {
3253                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3254                 return 1;
3255         }
3256
3257         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3258                                                ctdb->client, ctdb->cmd_pnn,
3259                                                TIMEOUT(), &conn);
3260         if (ret != 0) {
3261                 fprintf(stderr, "Failed to register connection\n");
3262                 return ret;
3263         }
3264
3265         return 0;
3266 }
3267
3268 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3269                              int argc, const char **argv)
3270 {
3271         struct ctdb_connection conn;
3272         int ret;
3273
3274         if (argc != 0 && argc != 2) {
3275                 usage("deltickle");
3276         }
3277
3278         if (argc == 0) {
3279                 struct ctdb_connection *clist;
3280                 struct tevent_req *req;
3281                 int count;
3282
3283                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3284                 if (ret != 0) {
3285                         return ret;
3286                 }
3287
3288                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3289                                          ctdb_req_control_tcp_remove,
3290                                          ctdb_reply_control_tcp_remove);
3291                 if (req == NULL) {
3292                         return ENOMEM;
3293                 }
3294
3295                 tevent_req_poll(req, ctdb->ev);
3296
3297                 ret = process_clist_recv(req);
3298                 if (ret != 0) {
3299                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3300                         return 1;
3301                 }
3302
3303                 return 0;
3304         }
3305
3306         if (! parse_ip_port(argv[0], &conn.src)) {
3307                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3308                 return 1;
3309         }
3310         if (! parse_ip_port(argv[1], &conn.dst)) {
3311                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3312                 return 1;
3313         }
3314
3315         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3316                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3317         if (ret != 0) {
3318                 fprintf(stderr, "Failed to unregister connection\n");
3319                 return ret;
3320         }
3321
3322         return 0;
3323 }
3324
3325 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3326                                 int argc, const char **argv)
3327 {
3328         uint64_t *srvid;
3329         uint8_t *result;
3330         int ret, i;
3331
3332         if (argc == 0) {
3333                 usage("check_srvids");
3334         }
3335
3336         srvid = talloc_array(mem_ctx, uint64_t, argc);
3337         if (srvid == NULL) {
3338                 fprintf(stderr, "Memory allocation error\n");
3339                 return 1;
3340         }
3341
3342         for (i=0; i<argc; i++) {
3343                 srvid[i] = strtoull(argv[i], NULL, 0);
3344         }
3345
3346         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3347                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3348                                      &result);
3349         if (ret != 0) {
3350                 fprintf(stderr, "Failed to check srvids on node %u\n",
3351                         ctdb->cmd_pnn);
3352                 return ret;
3353         }
3354
3355         for (i=0; i<argc; i++) {
3356                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3357                        (result[i] ? "exists" : "does not exist"));
3358         }
3359
3360         return 0;
3361 }
3362
3363 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3364                              int argc, const char **argv)
3365 {
3366         struct ctdb_node_map *nodemap;
3367         int i;
3368
3369         if (argc != 0) {
3370                 usage("listnodes");
3371         }
3372
3373         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3374         if (nodemap == NULL) {
3375                 return 1;
3376         }
3377
3378         for (i=0; i<nodemap->num; i++) {
3379                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3380                         continue;
3381                 }
3382
3383                 if (options.machinereadable) {
3384                         printf("%s%u%s%s%s\n", options.sep,
3385                                nodemap->node[i].pnn, options.sep,
3386                                ctdb_sock_addr_to_string(
3387                                         mem_ctx, &nodemap->node[i].addr),
3388                                options.sep);
3389                 } else {
3390                         printf("%s\n",
3391                                ctdb_sock_addr_to_string(
3392                                         mem_ctx, &nodemap->node[i].addr));
3393                 }
3394         }
3395
3396         return 0;
3397 }
3398
3399 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3400                               struct ctdb_node_map *nodemap2)
3401 {
3402         int i;
3403
3404         if (nodemap1->num != nodemap2->num) {
3405                 return false;
3406         }
3407
3408         for (i=0; i<nodemap1->num; i++) {
3409                 struct ctdb_node_and_flags *n1, *n2;
3410
3411                 n1 = &nodemap1->node[i];
3412                 n2 = &nodemap2->node[i];
3413
3414                 if ((n1->pnn != n2->pnn) ||
3415                     (n1->flags != n2->flags) ||
3416                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3417                         return false;
3418                 }
3419         }
3420
3421         return true;
3422 }
3423
3424 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3425                                    struct ctdb_node_map *nm,
3426                                    struct ctdb_node_map *fnm,
3427                                    bool *reload)
3428 {
3429         int i;
3430         bool check_failed = false;
3431
3432         *reload = false;
3433
3434         for (i=0; i<nm->num; i++) {
3435                 if (i >= fnm->num) {
3436                         fprintf(stderr,
3437                                 "Node %u (%s) missing from nodes file\n",
3438                                 nm->node[i].pnn,
3439                                 ctdb_sock_addr_to_string(
3440                                         mem_ctx, &nm->node[i].addr));
3441                         check_failed = true;
3442                         continue;
3443                 }
3444                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3445                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3446                         /* Node remains deleted */
3447                         continue;
3448                 }
3449
3450                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3451                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3452                         /* Node not newly nor previously deleted */
3453                         if (! ctdb_same_ip(&nm->node[i].addr,
3454                                            &fnm->node[i].addr)) {
3455                                 fprintf(stderr,
3456                                         "Node %u has changed IP address"
3457                                         " (was %s, now %s)\n",
3458                                         nm->node[i].pnn,
3459                                         ctdb_sock_addr_to_string(
3460                                                 mem_ctx, &nm->node[i].addr),
3461                                         ctdb_sock_addr_to_string(
3462                                                 mem_ctx, &fnm->node[i].addr));
3463                                 check_failed = true;
3464                         } else {
3465                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3466                                         fprintf(stderr,
3467                                                 "WARNING: Node %u is disconnected."
3468                                                 " You MUST fix this node manually!\n",
3469                                                 nm->node[i].pnn);
3470                                 }
3471                         }
3472                         continue;
3473                 }
3474
3475                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3476                         /* Node is being deleted */
3477                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3478                         *reload = true;
3479                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3480                                 fprintf(stderr,
3481                                         "ERROR: Node %u is still connected\n",
3482                                         nm->node[i].pnn);
3483                                 check_failed = true;
3484                         }
3485                         continue;
3486                 }
3487
3488                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3489                         /* Node was previously deleted */
3490                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3491                         *reload = true;
3492                 }
3493         }
3494
3495         if (check_failed) {
3496                 fprintf(stderr,
3497                         "ERROR: Nodes will not be reloaded due to previous error\n");
3498                 return 1;
3499         }
3500
3501         /* Leftover nodes in file are NEW */
3502         for (; i < fnm->num; i++) {
3503                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3504                 *reload = true;
3505         }
3506
3507         return 0;
3508 }
3509
3510 struct disable_recoveries_state {
3511         uint32_t *pnn_list;
3512         int node_count;
3513         bool *reply;
3514         int status;
3515         bool done;
3516 };
3517
3518 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3519                                        void *private_data)
3520 {
3521         struct disable_recoveries_state *state =
3522                 (struct disable_recoveries_state *)private_data;
3523         int ret, i;
3524
3525         if (data.dsize != sizeof(int)) {
3526                 /* Ignore packet */
3527                 return;
3528         }
3529
3530         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3531         ret = *(int *)data.dptr;
3532         if (ret < 0) {
3533                 state->status = ret;
3534                 state->done = true;
3535                 return;
3536         }
3537         for (i=0; i<state->node_count; i++) {
3538                 if (state->pnn_list[i] == ret) {
3539                         state->reply[i] = true;
3540                         break;
3541                 }
3542         }
3543
3544         state->done = true;
3545         for (i=0; i<state->node_count; i++) {
3546                 if (! state->reply[i]) {
3547                         state->done = false;
3548                         break;
3549                 }
3550         }
3551 }
3552
3553 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3554                               uint32_t timeout, uint32_t *pnn_list, int count)
3555 {
3556         struct ctdb_disable_message disable = { 0 };
3557         struct disable_recoveries_state state;
3558         int ret, i;
3559
3560         disable.pnn = ctdb->pnn;
3561         disable.srvid = next_srvid(ctdb);
3562         disable.timeout = timeout;
3563
3564         state.pnn_list = pnn_list;
3565         state.node_count = count;
3566         state.done = false;
3567         state.status = 0;
3568         state.reply = talloc_zero_array(mem_ctx, bool, count);
3569         if (state.reply == NULL) {
3570                 return ENOMEM;
3571         }
3572
3573         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3574                                               disable.srvid,
3575                                               disable_recoveries_handler,
3576                                               &state);
3577         if (ret != 0) {
3578                 return ret;
3579         }
3580
3581         for (i=0; i<count; i++) {
3582                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3583                                                       ctdb->client,
3584                                                       pnn_list[i],
3585                                                       &disable);
3586                 if (ret != 0) {
3587                         goto fail;
3588                 }
3589         }
3590
3591         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3592         if (ret == ETIME) {
3593                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3594         } else {
3595                 ret = (state.status >= 0 ? 0 : 1);
3596         }
3597
3598 fail:
3599         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3600                                            disable.srvid, &state);
3601         return ret;
3602 }
3603
3604 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3605                                int argc, const char **argv)
3606 {
3607         struct ctdb_node_map *nodemap = NULL;
3608         struct ctdb_node_map *file_nodemap;
3609         struct ctdb_node_map *remote_nodemap;
3610         struct ctdb_req_control request;
3611         struct ctdb_reply_control **reply;
3612         bool reload;
3613         int ret, i;
3614         uint32_t *pnn_list;
3615         int count;
3616
3617         nodemap = get_nodemap(ctdb, false);
3618         if (nodemap == NULL) {
3619                 return 1;
3620         }
3621
3622         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3623         if (file_nodemap == NULL) {
3624                 return 1;
3625         }
3626
3627         for (i=0; i<nodemap->num; i++) {
3628                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3629                         continue;
3630                 }
3631
3632                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3633                                                nodemap->node[i].pnn, TIMEOUT(),
3634                                                &remote_nodemap);
3635                 if (ret != 0) {
3636                         fprintf(stderr,
3637                                 "ERROR: Failed to get nodes file from node %u\n",
3638                                 nodemap->node[i].pnn);
3639                         return ret;
3640                 }
3641
3642                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3643                         fprintf(stderr,
3644                                 "ERROR: Nodes file on node %u differs"
3645                                  " from current node (%u)\n",
3646                                  nodemap->node[i].pnn, ctdb->pnn);
3647                         return 1;
3648                 }
3649         }
3650
3651         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3652         if (ret != 0) {
3653                 return ret;
3654         }
3655
3656         if (! reload) {
3657                 fprintf(stderr, "No change in nodes file,"
3658                                 " skipping unnecessary reload\n");
3659                 return 0;
3660         }
3661
3662         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3663                                         mem_ctx, &pnn_list);
3664         if (count <= 0) {
3665                 fprintf(stderr, "Memory allocation error\n");
3666                 return 1;
3667         }
3668
3669         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3670                                  pnn_list, count);
3671         if (ret != 0) {
3672                 fprintf(stderr, "Failed to disable recoveries\n");
3673                 return ret;
3674         }
3675
3676         ctdb_req_control_reload_nodes_file(&request);
3677         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3678                                         pnn_list, count, TIMEOUT(),
3679                                         &request, NULL, &reply);
3680         if (ret != 0) {
3681                 bool failed = false;
3682
3683                 for (i=0; i<count; i++) {
3684                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3685                         if (ret != 0) {
3686                                 fprintf(stderr,
3687                                         "Node %u failed to reload nodes\n",
3688                                         pnn_list[i]);
3689                                 failed = true;
3690                         }
3691                 }
3692                 if (failed) {
3693                         fprintf(stderr,
3694                                 "You MUST fix failed nodes manually!\n");
3695                 }
3696         }
3697
3698         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3699         if (ret != 0) {
3700                 fprintf(stderr, "Failed to enable recoveries\n");
3701                 return ret;
3702         }
3703
3704         return 0;
3705 }
3706
3707 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3708                   ctdb_sock_addr *addr, uint32_t pnn)
3709 {
3710         struct ctdb_public_ip_list *pubip_list;
3711         struct ctdb_public_ip pubip;
3712         struct ctdb_node_map *nodemap;
3713         struct ctdb_req_control request;
3714         uint32_t *pnn_list;
3715         int ret, i, count;
3716
3717         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3718                                             CTDB_BROADCAST_CONNECTED,
3719                                             2*options.timelimit);
3720         if (ret != 0) {
3721                 fprintf(stderr, "Failed to disable IP check\n");
3722                 return ret;
3723         }
3724
3725         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3726                                        pnn, TIMEOUT(), &pubip_list);
3727         if (ret != 0) {
3728                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3729                         pnn);
3730                 return ret;
3731         }
3732
3733         for (i=0; i<pubip_list->num; i++) {
3734                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3735                         break;
3736                 }
3737         }
3738
3739         if (i == pubip_list->num) {
3740                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3741                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3742                 return 1;
3743         }
3744
3745         nodemap = get_nodemap(ctdb, false);
3746         if (nodemap == NULL) {
3747                 return 1;
3748         }
3749
3750         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3751         if (count <= 0) {
3752                 fprintf(stderr, "Memory allocation error\n");
3753                 return 1;
3754         }
3755
3756         pubip.pnn = pnn;
3757         pubip.addr = *addr;
3758         ctdb_req_control_release_ip(&request, &pubip);
3759
3760         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3761                                         pnn_list, count, TIMEOUT(),
3762                                         &request, NULL, NULL);
3763         if (ret != 0) {
3764                 fprintf(stderr, "Failed to release IP on nodes\n");
3765                 return ret;
3766         }
3767
3768         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3769                                     pnn, TIMEOUT(), &pubip);
3770         if (ret != 0) {
3771                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3772                 return ret;
3773         }
3774
3775         return 0;
3776 }
3777
3778 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3779                           int argc, const char **argv)
3780 {
3781         ctdb_sock_addr addr;
3782         uint32_t pnn;
3783         int ret, retries = 0;
3784
3785         if (argc != 2) {
3786                 usage("moveip");
3787         }
3788
3789         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3790                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3791                 return 1;
3792         }
3793
3794         pnn = strtoul(argv[1], NULL, 10);
3795         if (pnn == CTDB_UNKNOWN_PNN) {
3796                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3797                 return 1;
3798         }
3799
3800         while (retries < 5) {
3801                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3802                 if (ret == 0) {
3803                         break;
3804                 }
3805
3806                 sleep(3);
3807                 retries++;
3808         }
3809
3810         if (ret != 0) {
3811                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3812                         argv[0], pnn);
3813                 return ret;
3814         }
3815
3816         return 0;
3817 }
3818
3819 static int control_rebalanceip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3820                                int argc, const char **argv)
3821 {
3822         ctdb_sock_addr addr;
3823         struct ctdb_node_map *nodemap;
3824         struct ctdb_public_ip pubip;
3825         struct ctdb_req_control request;
3826         uint32_t *pnn_list;
3827         int ret, count;
3828
3829         if (argc != 1) {
3830                 usage("rebalanceip");
3831         }
3832
3833         if (parse_ip(argv[0], NULL, 0, &addr)) {
3834                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3835                 return 1;
3836         }
3837
3838         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3839                                             CTDB_BROADCAST_CONNECTED,
3840                                             2*options.timelimit);
3841         if (ret != 0) {
3842                 fprintf(stderr, "Failed to disable IP check\n");
3843                 return 1;
3844         }
3845
3846         nodemap = get_nodemap(ctdb, false);
3847         if (nodemap == NULL) {
3848                 return 1;
3849         }
3850
3851         count = list_of_active_nodes(nodemap, -1, mem_ctx, &pnn_list);
3852         if (count <= 0) {
3853                 fprintf(stderr, "Memory allocation error\n");
3854                 return 1;
3855         }
3856
3857         pubip.pnn = CTDB_UNKNOWN_PNN;
3858         pubip.addr = addr;
3859
3860         ctdb_req_control_release_ip(&request, &pubip);
3861         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3862                                         pnn_list, count, TIMEOUT(),
3863                                         &request, NULL, NULL);
3864         if (ret != 0) {
3865                 fprintf(stderr, "Failed to release IP from nodes\n");
3866                 return 1;
3867         }
3868
3869         return ipreallocate(mem_ctx, ctdb);
3870 }
3871
3872 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3873                          uint32_t pnn)
3874 {
3875         int ret;
3876
3877         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3878                                           CTDB_BROADCAST_CONNECTED, pnn);
3879         if (ret != 0) {
3880                 fprintf(stderr,
3881                         "Failed to ask recovery master to distribute IPs\n");
3882                 return ret;
3883         }
3884
3885         return 0;
3886 }
3887
3888 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3889                          int argc, const char **argv)
3890 {
3891         ctdb_sock_addr addr;
3892         struct ctdb_public_ip_list *pubip_list;
3893         struct ctdb_addr_info addr_info;
3894         unsigned int mask;
3895         int ret, i, retries = 0;
3896
3897         if (argc != 2) {
3898                 usage("addip");
3899         }
3900
3901         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3902                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3903                 return 1;
3904         }
3905
3906         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3907                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3908         if (ret != 0) {
3909                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3910                         ctdb->cmd_pnn);
3911                 return 1;
3912         }
3913
3914         for (i=0; i<pubip_list->num; i++) {
3915                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3916                         fprintf(stderr, "Node already knows about IP %s\n",
3917                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3918                         return 0;
3919                 }
3920         }
3921
3922         addr_info.addr = addr;
3923         addr_info.mask = mask;
3924         addr_info.iface = argv[1];
3925
3926         while (retries < 5) {
3927                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3928                                               ctdb->cmd_pnn, TIMEOUT(),
3929                                               &addr_info);
3930                 if (ret == 0) {
3931                         break;
3932                 }
3933
3934                 sleep(3);
3935                 retries++;
3936         }
3937
3938         if (ret != 0) {
3939                 fprintf(stderr, "Failed to add public IP to node %u."
3940                                 " Giving up\n", ctdb->cmd_pnn);
3941                 return ret;
3942         }
3943
3944         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3945         if (ret != 0) {
3946                 return ret;
3947         }
3948
3949         return ipreallocate(mem_ctx, ctdb);
3950 }
3951
3952 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3953                          int argc, const char **argv)
3954 {
3955         ctdb_sock_addr addr;
3956         struct ctdb_public_ip_list *pubip_list;
3957         struct ctdb_addr_info addr_info;
3958         int ret, i;
3959
3960         if (argc != 1) {
3961                 usage("delip");
3962         }
3963
3964         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3965                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3966                 return 1;
3967         }
3968
3969         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3970                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3971         if (ret != 0) {
3972                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3973                         ctdb->cmd_pnn);
3974                 return 1;
3975         }
3976
3977         for (i=0; i<pubip_list->num; i++) {
3978                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3979                         break;
3980                 }
3981         }
3982
3983         if (i == pubip_list->num) {
3984                 fprintf(stderr, "Node does not know about IP address %s\n",
3985                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3986                 return 0;
3987         }
3988
3989         addr_info.addr = addr;
3990         addr_info.mask = 0;
3991         addr_info.iface = NULL;
3992
3993         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3994                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3995         if (ret != 0) {
3996                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3997                         ctdb->cmd_pnn);
3998                 return ret;
3999         }
4000
4001         return 0;
4002 }
4003
4004 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4005                                int argc, const char **argv)
4006 {
4007         int ret;
4008
4009         if (argc != 1) {
4010                 usage("eventscript");
4011         }
4012
4013         if (strcmp(argv[0], "monitor") != 0) {
4014                 fprintf(stderr, "Only monitor event can be run\n");
4015                 return 1;
4016         }
4017
4018         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
4019                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4020         if (ret != 0) {
4021                 fprintf(stderr, "Failed to run monitor event on node %u\n",
4022                         ctdb->cmd_pnn);
4023                 return ret;
4024         }
4025
4026         return 0;
4027 }
4028
4029 #define DB_VERSION      3
4030 #define MAX_DB_NAME     64
4031 #define MAX_REC_BUFFER_SIZE     (100*1000)
4032
4033 struct db_header {
4034         unsigned long version;
4035         time_t timestamp;
4036         unsigned long flags;
4037         unsigned long nbuf;
4038         unsigned long nrec;
4039         char name[MAX_DB_NAME];
4040 };
4041
4042 struct backup_state {
4043         TALLOC_CTX *mem_ctx;
4044         struct ctdb_rec_buffer *recbuf;
4045         uint32_t db_id;
4046         int fd;
4047         unsigned int nbuf, nrec;
4048 };
4049
4050 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4051                           TDB_DATA key, TDB_DATA data, void *private_data)
4052 {
4053         struct backup_state *state = (struct backup_state *)private_data;
4054         size_t len;
4055         int ret;
4056
4057         if (state->recbuf == NULL) {
4058                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4059                                                      state->db_id);
4060                 if (state->recbuf == NULL) {
4061                         return ENOMEM;
4062                 }
4063         }
4064
4065         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4066                                   header, key, data);
4067         if (ret != 0) {
4068                 return ret;
4069         }
4070
4071         len = ctdb_rec_buffer_len(state->recbuf);
4072         if (len < MAX_REC_BUFFER_SIZE) {
4073                 return 0;
4074         }
4075
4076         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4077         if (ret != 0) {
4078                 fprintf(stderr, "Failed to write records to backup file\n");
4079                 return ret;
4080         }
4081
4082         state->nbuf += 1;
4083         state->nrec += state->recbuf->count;
4084         TALLOC_FREE(state->recbuf);
4085
4086         return 0;
4087 }
4088
4089 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4090                             int argc, const char **argv)
4091 {
4092         const char *db_name;
4093         struct ctdb_db_context *db;
4094         uint32_t db_id;
4095         uint8_t db_flags;
4096         struct backup_state state;
4097         struct db_header db_hdr;
4098         int fd, ret;
4099
4100         if (argc != 2) {
4101                 usage("backupdb");
4102         }
4103
4104         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4105                 return 1;
4106         }
4107
4108         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4109                           db_flags, &db);
4110         if (ret != 0) {
4111                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4112                 return ret;
4113         }
4114
4115         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4116         if (fd == -1) {
4117                 ret = errno;
4118                 fprintf(stderr, "Failed to open file %s for writing\n",
4119                         argv[1]);
4120                 return ret;
4121         }
4122
4123         /* Write empty header first */
4124         ZERO_STRUCT(db_hdr);
4125         ret = write(fd, &db_hdr, sizeof(struct db_header));
4126         if (ret == -1) {
4127                 ret = errno;
4128                 close(fd);
4129                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4130                 return ret;
4131         }
4132
4133         state.mem_ctx = mem_ctx;
4134         state.recbuf = NULL;
4135         state.fd = fd;
4136         state.nbuf = 0;
4137         state.nrec = 0;
4138
4139         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4140         if (ret != 0) {
4141                 fprintf(stderr, "Failed to collect records from DB %s\n",
4142                         db_name);
4143                 close(fd);
4144                 return ret;
4145         }
4146
4147         if (state.recbuf != NULL) {
4148                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4149                 if (ret != 0) {
4150                         fprintf(stderr,
4151                                 "Failed to write records to backup file\n");
4152                         close(fd);
4153                         return ret;
4154                 }
4155
4156                 state.nbuf += 1;
4157                 state.nrec += state.recbuf->count;
4158                 TALLOC_FREE(state.recbuf);
4159         }
4160
4161         db_hdr.version = DB_VERSION;
4162         db_hdr.timestamp = time(NULL);
4163         db_hdr.flags = db_flags;
4164         db_hdr.nbuf = state.nbuf;
4165         db_hdr.nrec = state.nrec;
4166         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4167
4168         lseek(fd, 0, SEEK_SET);
4169         ret = write(fd, &db_hdr, sizeof(struct db_header));
4170         if (ret == -1) {
4171                 ret = errno;
4172                 close(fd);
4173                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4174                 return ret;
4175         }
4176
4177         close(fd);
4178         printf("Database backed up to %s\n", argv[1]);
4179         return 0;
4180 }
4181
4182 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4183                              int argc, const char **argv)
4184 {
4185         const char *db_name = NULL;
4186         struct ctdb_db_context *db;
4187         struct db_header db_hdr;
4188         struct ctdb_node_map *nodemap;
4189         struct ctdb_req_control request;
4190         struct ctdb_reply_control **reply;
4191         struct ctdb_transdb wipedb;
4192         struct ctdb_pulldb_ext pulldb;
4193         struct ctdb_rec_buffer *recbuf;
4194         uint32_t generation;
4195         uint32_t *pnn_list;
4196         char timebuf[128];
4197         int fd, i;
4198         int count, ret;
4199
4200         if (argc < 1 || argc > 2) {
4201                 usage("restoredb");
4202         }
4203
4204         fd = open(argv[0], O_RDONLY, 0600);
4205         if (fd == -1) {
4206                 ret = errno;
4207                 fprintf(stderr, "Failed to open file %s for reading\n",
4208                         argv[0]);
4209                 return ret;
4210         }
4211
4212         if (argc == 2) {
4213                 db_name = argv[1];
4214         }
4215
4216         ret = read(fd, &db_hdr, sizeof(struct db_header));
4217         if (ret == -1) {
4218                 ret = errno;
4219                 close(fd);
4220                 fprintf(stderr, "Failed to read db header from file %s\n",
4221                         argv[0]);
4222                 return ret;
4223         }
4224
4225         if (db_hdr.version != DB_VERSION) {
4226                 fprintf(stderr,
4227                         "Wrong version of backup file, expected %u, got %lu\n",
4228                         DB_VERSION, db_hdr.version);
4229                 close(fd);
4230                 return EINVAL;
4231         }
4232
4233         if (db_name == NULL) {
4234                 db_name = db_hdr.name;
4235         }
4236
4237         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4238                  localtime(&db_hdr.timestamp));
4239         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4240
4241         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4242                           db_hdr.flags, &db);
4243         if (ret != 0) {
4244                 fprintf(stderr, "Failed to attach to DB %s\n", db_hdr.name);
4245                 return ret;
4246         }
4247
4248         nodemap = get_nodemap(ctdb, false);
4249         if (nodemap == NULL) {
4250                 fprintf(stderr, "Failed to get nodemap\n");
4251                 return ENOMEM;
4252         }
4253
4254         ret = get_generation(mem_ctx, ctdb, &generation);
4255         if (ret != 0) {
4256                 fprintf(stderr, "Failed to get current generation\n");
4257                 return ret;
4258         }
4259
4260         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4261                                      &pnn_list);
4262         if (count <= 0) {
4263                 return ENOMEM;
4264         }
4265
4266         wipedb.db_id = ctdb_db_id(db);
4267         wipedb.tid = generation;
4268
4269         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4270         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4271                                         ctdb->client, pnn_list, count,
4272                                         TIMEOUT(), &request, NULL, NULL);
4273         if (ret != 0) {
4274                 goto failed;
4275         }
4276
4277
4278         ctdb_req_control_db_transaction_start(&request, &wipedb);
4279         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4280                                         pnn_list, count, TIMEOUT(),
4281                                         &request, NULL, NULL);
4282         if (ret != 0) {
4283                 goto failed;
4284         }
4285
4286         ctdb_req_control_wipe_database(&request, &wipedb);
4287         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4288                                         pnn_list, count, TIMEOUT(),
4289                                         &request, NULL, NULL);
4290         if (ret != 0) {
4291                 goto failed;
4292         }
4293
4294         pulldb.db_id = ctdb_db_id(db);
4295         pulldb.lmaster = 0;
4296         pulldb.srvid = SRVID_CTDB_PUSHDB;
4297
4298         ctdb_req_control_db_push_start(&request, &pulldb);
4299         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4300                                         pnn_list, count, TIMEOUT(),
4301                                         &request, NULL, NULL);
4302         if (ret != 0) {
4303                 goto failed;
4304         }
4305
4306         for (i=0; i<db_hdr.nbuf; i++) {
4307                 struct ctdb_req_message message;
4308                 TDB_DATA data;
4309
4310                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4311                 if (ret != 0) {
4312                         goto failed;
4313                 }
4314
4315                 data.dsize = ctdb_rec_buffer_len(recbuf);
4316                 data.dptr = talloc_size(mem_ctx, data.dsize);
4317                 if (data.dptr == NULL) {
4318                         goto failed;
4319                 }
4320
4321                 ctdb_rec_buffer_push(recbuf, data.dptr);
4322
4323                 message.srvid = pulldb.srvid;
4324                 message.data.data = data;
4325
4326                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4327                                                 ctdb->client,
4328                                                 pnn_list, count,
4329                                                 &message, NULL);
4330                 if (ret != 0) {
4331                         goto failed;
4332                 }
4333
4334                 talloc_free(recbuf);
4335                 talloc_free(data.dptr);
4336         }
4337
4338         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4339         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4340                                         pnn_list, count, TIMEOUT(),
4341                                         &request, NULL, &reply);
4342         if (ret != 0) {
4343                 goto failed;
4344         }
4345
4346         for (i=0; i<count; i++) {
4347                 uint32_t num_records;
4348
4349                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4350                                                          &num_records);
4351                 if (ret != 0) {
4352                         fprintf(stderr, "Invalid response from node %u\n",
4353                                 pnn_list[i]);
4354                         goto failed;
4355                 }
4356
4357                 if (num_records != db_hdr.nrec) {
4358                         fprintf(stderr, "Node %u received %u of %lu records\n",
4359                                 pnn_list[i], num_records, db_hdr.nrec);
4360                         goto failed;
4361                 }
4362         }
4363
4364         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4365         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4366                                         pnn_list, count, TIMEOUT(),
4367                                         &request, NULL, NULL);
4368         if (ret != 0) {
4369                 goto failed;
4370         }
4371
4372         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4373         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4374                                         pnn_list, count, TIMEOUT(),
4375                                         &request, NULL, NULL);
4376         if (ret != 0) {
4377                 goto failed;
4378         }
4379
4380         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4381         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4382                                         ctdb->client, pnn_list, count,
4383                                         TIMEOUT(), &request, NULL, NULL);
4384         if (ret != 0) {
4385                 goto failed;
4386         }
4387
4388         printf("Database %s restored\n", db_name);
4389         return 0;
4390
4391
4392 failed:
4393         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4394                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4395         return ret;
4396 }
4397
4398 struct dumpdbbackup_state {
4399         ctdb_rec_parser_func_t parser;
4400         struct dump_record_state sub_state;
4401 };
4402
4403 static int dumpdbbackup_handler(uint32_t reqid,
4404                                 struct ctdb_ltdb_header *header,
4405                                 TDB_DATA key, TDB_DATA data,
4406                                 void *private_data)
4407 {
4408         struct dumpdbbackup_state *state =
4409                 (struct dumpdbbackup_state *)private_data;
4410         struct ctdb_ltdb_header hdr;
4411         int ret;
4412
4413         ret = ctdb_ltdb_header_extract(&data, &hdr);
4414         if (ret != 0) {
4415                 return ret;
4416         }
4417
4418         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4419 }
4420
4421 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4422                                 int argc, const char **argv)
4423 {
4424         struct db_header db_hdr;
4425         char timebuf[128];
4426         struct dumpdbbackup_state state;
4427         int fd, ret, i;
4428
4429         if (argc != 1) {
4430                 usage("dumpbackup");
4431         }
4432
4433         fd = open(argv[0], O_RDONLY, 0600);
4434         if (fd == -1) {
4435                 ret = errno;
4436                 fprintf(stderr, "Failed to open file %s for reading\n",
4437                         argv[0]);
4438                 return ret;
4439         }
4440
4441         ret = read(fd, &db_hdr, sizeof(struct db_header));
4442         if (ret == -1) {
4443                 ret = errno;
4444                 close(fd);
4445                 fprintf(stderr, "Failed to read db header from file %s\n",
4446                         argv[0]);
4447                 return ret;
4448         }
4449
4450         if (db_hdr.version != DB_VERSION) {
4451                 fprintf(stderr,
4452                         "Wrong version of backup file, expected %u, got %lu\n",
4453                         DB_VERSION, db_hdr.version);
4454                 return EINVAL;
4455         }
4456
4457         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4458                  localtime(&db_hdr.timestamp));
4459         printf("Dumping database %s from backup @ %s\n",
4460                db_hdr.name, timebuf);
4461
4462         state.parser = dump_record;
4463         state.sub_state.count = 0;
4464
4465         for (i=0; i<db_hdr.nbuf; i++) {
4466                 struct ctdb_rec_buffer *recbuf;
4467
4468                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4469                 if (ret != 0) {
4470                         fprintf(stderr, "Failed to read records\n");
4471                         return ret;
4472                 }
4473
4474                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4475                                                &state);
4476                 if (ret != 0) {
4477                         fprintf(stderr, "Failed to dump records\n");
4478                         return ret;
4479                 }
4480         }
4481
4482         printf("Dumped %u record(s)\n", state.sub_state.count);
4483         return 0;
4484 }
4485
4486 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4487                           int argc, const char **argv)
4488 {
4489         const char *db_name;
4490         struct ctdb_db_context *db;
4491         uint32_t db_id;
4492         uint8_t db_flags;
4493         struct ctdb_node_map *nodemap;
4494         struct ctdb_req_control request;
4495         struct ctdb_transdb wipedb;
4496         uint32_t generation;
4497         uint32_t *pnn_list;
4498         int count, ret;
4499
4500         if (argc != 1) {
4501                 usage("wipedb");
4502         }
4503
4504         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4505                 return 1;
4506         }
4507
4508         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4509                           db_flags, &db);
4510         if (ret != 0) {
4511                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4512                 return ret;
4513         }
4514
4515         nodemap = get_nodemap(ctdb, false);
4516         if (nodemap == NULL) {
4517                 fprintf(stderr, "Failed to get nodemap\n");
4518                 return ENOMEM;
4519         }
4520
4521         ret = get_generation(mem_ctx, ctdb, &generation);
4522         if (ret != 0) {
4523                 fprintf(stderr, "Failed to get current generation\n");
4524                 return ret;
4525         }
4526
4527         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4528                                      &pnn_list);
4529         if (count <= 0) {
4530                 return ENOMEM;
4531         }
4532
4533         ctdb_req_control_db_freeze(&request, db_id);
4534         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4535                                         ctdb->client, pnn_list, count,
4536                                         TIMEOUT(), &request, NULL, NULL);
4537         if (ret != 0) {
4538                 goto failed;
4539         }
4540
4541         wipedb.db_id = db_id;
4542         wipedb.tid = generation;
4543
4544         ctdb_req_control_db_transaction_start(&request, &wipedb);
4545         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4546                                         pnn_list, count, TIMEOUT(),
4547                                         &request, NULL, NULL);
4548         if (ret != 0) {
4549                 goto failed;
4550         }
4551
4552         ctdb_req_control_wipe_database(&request, &wipedb);
4553         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4554                                         pnn_list, count, TIMEOUT(),
4555                                         &request, NULL, NULL);
4556         if (ret != 0) {
4557                 goto failed;
4558         }
4559
4560         ctdb_req_control_db_set_healthy(&request, db_id);
4561         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4562                                         pnn_list, count, TIMEOUT(),
4563                                         &request, NULL, NULL);
4564         if (ret != 0) {
4565                 goto failed;
4566         }
4567
4568         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4569         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4570                                         pnn_list, count, TIMEOUT(),
4571                                         &request, NULL, NULL);
4572         if (ret != 0) {
4573                 goto failed;
4574         }
4575
4576         ctdb_req_control_db_thaw(&request, db_id);
4577         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4578                                         ctdb->client, pnn_list, count,
4579                                         TIMEOUT(), &request, NULL, NULL);
4580         if (ret != 0) {
4581                 goto failed;
4582         }
4583
4584         printf("Database %s wiped\n", db_name);
4585         return 0;
4586
4587
4588 failed:
4589         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4590                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4591         return ret;
4592 }
4593
4594 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4595                              int argc, const char **argv)
4596 {
4597         uint32_t recmaster;
4598         int ret;
4599
4600         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4601                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4602         if (ret != 0) {
4603                 return ret;
4604         }
4605
4606         printf("%u\n", recmaster);
4607         return 0;
4608 }
4609
4610 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4611                                    const char *event_str)
4612 {
4613         int i;
4614         int num_run = 0;
4615
4616         if (slist == NULL) {
4617                 if (! options.machinereadable) {
4618                         printf("%s cycle never run\n", event_str);
4619                 }
4620                 return;
4621         }
4622
4623         for (i=0; i<slist->num_scripts; i++) {
4624                 if (slist->script[i].status != -ENOEXEC) {
4625                         num_run++;
4626                 }
4627         }
4628
4629         if (! options.machinereadable) {
4630                 printf("%d scripts were executed last %s cycle\n",
4631                        num_run, event_str);
4632         }
4633
4634         for (i=0; i<slist->num_scripts; i++) {
4635                 const char *status = NULL;
4636
4637                 switch (slist->script[i].status) {
4638                 case -ETIME:
4639                         status = "TIMEDOUT";
4640                         break;
4641                 case -ENOEXEC:
4642                         status = "DISABLED";
4643                         break;
4644                 case 0:
4645                         status = "OK";
4646                         break;
4647                 default:
4648                         if (slist->script[i].status > 0) {
4649                                 status = "ERROR";
4650                         }
4651                         break;
4652                 }
4653
4654                 if (options.machinereadable) {
4655                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4656                                options.sep,
4657                                event_str, options.sep,
4658                                slist->script[i].name, options.sep,
4659                                slist->script[i].status, options.sep,
4660                                status, options.sep,
4661                                slist->script[i].start.tv_sec,
4662                                slist->script[i].start.tv_usec, options.sep,
4663                                slist->script[i].finished.tv_sec,
4664                                slist->script[i].finished.tv_usec, options.sep,
4665                                slist->script[i].output, options.sep);
4666                         continue;
4667                 }
4668
4669                 if (status) {
4670                         printf("%-20s Status:%s    ",
4671                                slist->script[i].name, status);
4672                 } else {
4673                         /* Some other error, eg from stat. */
4674                         printf("%-20s Status:CANNOT RUN (%s)",
4675                                slist->script[i].name,
4676                                strerror(-slist->script[i].status));
4677                 }
4678
4679                 if (slist->script[i].status >= 0) {
4680                         printf("Duration:%.3lf ",
4681                                timeval_delta(&slist->script[i].finished,
4682                                              &slist->script[i].start));
4683                 }
4684                 if (slist->script[i].status != -ENOEXEC) {
4685                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4686                         if (slist->script[i].status != 0) {
4687                                 printf("   OUTPUT:%s\n",
4688                                        slist->script[i].output);
4689                         }
4690                 } else {
4691                         printf("\n");
4692                 }
4693         }
4694 }
4695
4696 static void print_scriptstatus(struct ctdb_script_list **slist,
4697                                int count, const char **event_str)
4698 {
4699         int i;
4700
4701         if (options.machinereadable) {
4702                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4703                        options.sep,
4704                        "Type", options.sep,
4705                        "Name", options.sep,
4706                        "Code", options.sep,
4707                        "Status", options.sep,
4708                        "Start", options.sep,
4709                        "End", options.sep,
4710                        "Error Output", options.sep);
4711         }
4712
4713         for (i=0; i<count; i++) {
4714                 print_scriptstatus_one(slist[i], event_str[i]);
4715         }
4716 }
4717
4718 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4719                                 int argc, const char **argv)
4720 {
4721         struct ctdb_script_list **slist;
4722         const char *event_str;
4723         enum ctdb_event event;
4724         const char *all_events[] = {
4725                 "init", "setup", "startup", "monitor",
4726                 "takeip", "releaseip", "updateip", "ipreallocated" };
4727         bool valid;
4728         int ret, i, j;
4729         int count, start, end, num;
4730
4731         if (argc > 1) {
4732                 usage("scriptstatus");
4733         }
4734
4735         if (argc == 0) {
4736                 event_str = "monitor";
4737         } else {
4738                 event_str = argv[0];
4739         }
4740
4741         valid = false;
4742         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4743                 if (strcmp(event_str, all_events[i]) == 0) {
4744                         valid = true;
4745                         count = 1;
4746                         start = i;
4747                         end = i+1;
4748                         break;
4749                 }
4750         }
4751
4752         if (strcmp(event_str, "all") == 0) {
4753                 valid = true;
4754                 count = ARRAY_SIZE(all_events);
4755                 start = 0;
4756                 end = count-1;
4757         }
4758
4759         if (! valid) {
4760                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4761                 usage("scriptstatus");
4762         }
4763
4764         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4765         if (slist == NULL) {
4766                 fprintf(stderr, "Memory allocation error\n");
4767                 return 1;
4768         }
4769
4770         num = 0;
4771         for (i=start; i<end; i++) {
4772                 event = ctdb_event_from_string(all_events[i]);
4773
4774                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4775                                                         ctdb->client,
4776                                                         ctdb->cmd_pnn,
4777                                                         TIMEOUT(), event,
4778                                                         &slist[num]);
4779                 if (ret != 0) {
4780                         fprintf(stderr,
4781                                 "failed to get script status for %s event\n",
4782                                 all_events[i]);
4783                         return 1;
4784                 }
4785
4786                 if (slist[num] == NULL) {
4787                         num++;
4788                         continue;
4789                 }
4790
4791                 /* The ETIME status is ignored for certain events.
4792                  * In that case the status is 0, but endtime is not set.
4793                  */
4794                 for (j=0; j<slist[num]->num_scripts; j++) {
4795                         if (slist[num]->script[j].status == 0 &&
4796                             timeval_is_zero(&slist[num]->script[j].finished)) {
4797                                 slist[num]->script[j].status = -ETIME;
4798                         }
4799                 }
4800
4801                 num++;
4802         }
4803
4804         print_scriptstatus(slist, count, &all_events[start]);
4805         return 0;
4806 }
4807
4808 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4809                                 int argc, const char **argv)
4810 {
4811         int ret;
4812
4813         if (argc != 1) {
4814                 usage("enablescript");
4815         }
4816
4817         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4818                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4819         if (ret != 0) {
4820                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4821                 return ret;
4822         }
4823
4824         return 0;
4825 }
4826
4827 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4828                                  int argc, const char **argv)
4829 {
4830         int ret;
4831
4832         if (argc != 1) {
4833                 usage("disablescript");
4834         }
4835
4836         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4837                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4838         if (ret != 0) {
4839                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4840                 return ret;
4841         }
4842
4843         return 0;
4844 }
4845
4846 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4847                          int argc, const char **argv)
4848 {
4849         char *t, *natgw_helper = NULL;
4850
4851         if (argc != 1) {
4852                 usage("natgw");
4853         }
4854
4855         t = getenv("CTDB_NATGW_HELPER");
4856         if (t != NULL) {
4857                 natgw_helper = talloc_strdup(mem_ctx, t);
4858         } else {
4859                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4860                                                CTDB_HELPER_BINDIR);
4861         }
4862
4863         if (natgw_helper == NULL) {
4864                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4865                 return 1;
4866         }
4867
4868         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4869 }
4870
4871 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4872                              int argc, const char **argv)
4873 {
4874         char *t, *natgw_helper = NULL;
4875
4876         if (argc != 0) {
4877                 usage("natgwlist");
4878         }
4879
4880         t = getenv("CTDB_NATGW_HELPER");
4881         if (t != NULL) {
4882                 natgw_helper = talloc_strdup(mem_ctx, t);
4883         } else {
4884                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4885                                                CTDB_HELPER_BINDIR);
4886         }
4887
4888         if (natgw_helper == NULL) {
4889                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4890                 return 1;
4891         }
4892
4893         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4894 }
4895
4896 /*
4897  * Find the PNN of the current node
4898  * discover the pnn by loading the nodes file and try to bind
4899  * to all addresses one at a time until the ip address is found.
4900  */
4901 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4902 {
4903         struct ctdb_node_map *nodemap;
4904         int i;
4905
4906         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4907         if (nodemap == NULL) {
4908                 return false;
4909         }
4910
4911         for (i=0; i<nodemap->num; i++) {
4912                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4913                         continue;
4914                 }
4915                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4916                         if (pnn != NULL) {
4917                                 *pnn = nodemap->node[i].pnn;
4918                         }
4919                         talloc_free(nodemap);
4920                         return true;
4921                 }
4922         }
4923
4924         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4925         talloc_free(nodemap);
4926         return false;
4927 }
4928
4929 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4930                               int argc, const char **argv)
4931 {
4932         const char *reclock;
4933         int ret;
4934
4935         if (argc != 0) {
4936                 usage("getreclock");
4937         }
4938
4939         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4940                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4941         if (ret != 0) {
4942                 return ret;
4943         }
4944
4945         if (reclock != NULL) {
4946                 printf("%s\n", reclock);
4947         }
4948
4949         return 0;
4950 }
4951
4952 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4953                                   struct ctdb_context *ctdb,
4954                                   int argc, const char **argv)
4955 {
4956         uint32_t lmasterrole = 0;
4957         int ret;
4958
4959         if (argc != 1) {
4960                 usage("setlmasterrole");
4961         }
4962
4963         if (strcmp(argv[0], "on") == 0) {
4964                 lmasterrole = 1;
4965         } else if (strcmp(argv[0], "off") == 0) {
4966                 lmasterrole = 0;
4967         } else {
4968                 usage("setlmasterrole");
4969         }
4970
4971         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4972                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4973         if (ret != 0) {
4974                 return ret;
4975         }
4976
4977         return 0;
4978 }
4979
4980 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4981                                     struct ctdb_context *ctdb,
4982                                     int argc, const char **argv)
4983 {
4984         uint32_t recmasterrole = 0;
4985         int ret;
4986
4987         if (argc != 1) {
4988                 usage("setrecmasterrole");
4989         }
4990
4991         if (strcmp(argv[0], "on") == 0) {
4992                 recmasterrole = 1;
4993         } else if (strcmp(argv[0], "off") == 0) {
4994                 recmasterrole = 0;
4995         } else {
4996                 usage("setrecmasterrole");
4997         }
4998
4999         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
5000                                           ctdb->cmd_pnn, TIMEOUT(),
5001                                           recmasterrole);
5002         if (ret != 0) {
5003                 return ret;
5004         }
5005
5006         return 0;
5007 }
5008
5009 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
5010                                  struct ctdb_context *ctdb,
5011                                  int argc, const char **argv)
5012 {
5013         uint32_t db_id;
5014         uint8_t db_flags;
5015         int ret;
5016
5017         if (argc != 1) {
5018                 usage("setdbreadonly");
5019         }
5020
5021         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5022                 return 1;
5023         }
5024
5025         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5026                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
5027                 return 1;
5028         }
5029
5030         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5031                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5032         if (ret != 0) {
5033                 return ret;
5034         }
5035
5036         return 0;
5037 }
5038
5039 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5040                                int argc, const char **argv)
5041 {
5042         uint32_t db_id;
5043         uint8_t db_flags;
5044         int ret;
5045
5046         if (argc != 1) {
5047                 usage("setdbsticky");
5048         }
5049
5050         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5051                 return 1;
5052         }
5053
5054         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5055                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5056                 return 1;
5057         }
5058
5059         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5060                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5061         if (ret != 0) {
5062                 return ret;
5063         }
5064
5065         return 0;
5066 }
5067
5068 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5069                           int argc, const char **argv)
5070 {
5071         const char *db_name;
5072         struct ctdb_db_context *db;
5073         struct ctdb_transaction_handle *h;
5074         uint8_t db_flags;
5075         TDB_DATA key, data;
5076         int ret;
5077
5078         if (argc < 2 || argc > 3) {
5079                 usage("pfetch");
5080         }
5081
5082         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5083                 return 1;
5084         }
5085
5086         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5087                 fprintf(stderr, "DB %s is not a persistent database\n",
5088                         db_name);
5089                 return 1;
5090         }
5091
5092         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5093                           db_flags, &db);
5094         if (ret != 0) {
5095                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5096                 return ret;
5097         }
5098
5099         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5100         if (ret != 0) {
5101                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5102                 return ret;
5103         }
5104
5105         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5106                                      TIMEOUT(), db, true, &h);
5107         if (ret != 0) {
5108                 fprintf(stderr, "Failed to start transaction on db %s\n",
5109                         db_name);
5110                 return ret;
5111         }
5112
5113         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5114         if (ret != 0) {
5115                 fprintf(stderr, "Failed to read record for key %s\n",
5116                         argv[1]);
5117                 return ret;
5118         }
5119
5120         printf("%.*s\n", (int)data.dsize, data.dptr);
5121
5122         ctdb_transaction_cancel(h);
5123         return 0;
5124 }
5125
5126 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5127                           int argc, const char **argv)
5128 {
5129         const char *db_name;
5130         struct ctdb_db_context *db;
5131         struct ctdb_transaction_handle *h;
5132         uint8_t db_flags;
5133         TDB_DATA key, data;
5134         int ret;
5135
5136         if (argc != 3) {
5137                 usage("pstore");
5138         }
5139
5140         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5141                 return 1;
5142         }
5143
5144         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5145                 fprintf(stderr, "DB %s is not a persistent database\n",
5146                         db_name);
5147                 return 1;
5148         }
5149
5150         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5151                           db_flags, &db);
5152         if (ret != 0) {
5153                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5154                 return ret;
5155         }
5156
5157         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5158         if (ret != 0) {
5159                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5160                 return ret;
5161         }
5162
5163         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5164         if (ret != 0) {
5165                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5166                 return ret;
5167         }
5168
5169         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5170                                      TIMEOUT(), db, false, &h);
5171         if (ret != 0) {
5172                 fprintf(stderr, "Failed to start transaction on db %s\n",
5173                         db_name);
5174                 return ret;
5175         }
5176
5177         ret = ctdb_transaction_store_record(h, key, data);
5178         if (ret != 0) {
5179                 fprintf(stderr, "Failed to store record for key %s\n",
5180                         argv[1]);
5181                 return ret;
5182         }
5183
5184         ret = ctdb_transaction_commit(h);
5185         if (ret != 0) {
5186                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5187                         db_name);
5188                 return ret;
5189         }
5190
5191         return 0;
5192 }
5193
5194 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5195                            int argc, const char **argv)
5196 {
5197         const char *db_name;
5198         struct ctdb_db_context *db;
5199         struct ctdb_transaction_handle *h;
5200         uint8_t db_flags;
5201         TDB_DATA key;
5202         int ret;
5203
5204         if (argc != 2) {
5205                 usage("pdelete");
5206         }
5207
5208         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5209                 return 1;
5210         }
5211
5212         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5213                 fprintf(stderr, "DB %s is not a persistent database\n",
5214                         db_name);
5215                 return 1;
5216         }
5217
5218         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5219                           db_flags, &db);
5220         if (ret != 0) {
5221                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5222                 return ret;
5223         }
5224
5225         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5226         if (ret != 0) {
5227                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5228                 return ret;
5229         }
5230
5231         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5232                                      TIMEOUT(), db, false, &h);
5233         if (ret != 0) {
5234                 fprintf(stderr, "Failed to start transaction on db %s\n",
5235                         db_name);
5236                 return ret;
5237         }
5238
5239         ret = ctdb_transaction_delete_record(h, key);
5240         if (ret != 0) {
5241                 fprintf(stderr, "Failed to delete record for key %s\n",
5242                         argv[1]);
5243                 return ret;
5244         }
5245
5246         ret = ctdb_transaction_commit(h);
5247         if (ret != 0) {
5248                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5249                         db_name);
5250                 return ret;
5251         }
5252
5253         return 0;
5254 }
5255
5256 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5257 {
5258         const char *t;
5259         size_t n;
5260         int ret;
5261
5262         *data = tdb_null;
5263
5264         /* Skip whitespace */
5265         n = strspn(*ptr, " \t");
5266         t = *ptr + n;
5267
5268         if (t[0] == '"') {
5269                 /* Quoted ASCII string - no wide characters! */
5270                 t++;
5271                 n = strcspn(t, "\"");
5272                 if (t[n] == '"') {
5273                         if (n > 0) {
5274                                 ret = str_to_data(t, n, mem_ctx, data);
5275                                 if (ret != 0) {
5276                                         return ret;
5277                                 }
5278                         }
5279                         *ptr = t + n + 1;
5280                 } else {
5281                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5282                         return 1;
5283                 }
5284         } else {
5285                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5286                 return 1;
5287         }
5288
5289         return 0;
5290 }
5291
5292 #define MAX_LINE_SIZE   1024
5293
5294 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5295                                  TDB_DATA *key, TDB_DATA *value)
5296 {
5297         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5298         const char *ptr;
5299         int ret;
5300
5301         ptr = fgets(line, MAX_LINE_SIZE, file);
5302         if (ptr == NULL) {
5303                 return false;
5304         }
5305
5306         /* Get key */
5307         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5308         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5309                 /* Line Ignored but not EOF */
5310                 *key = tdb_null;
5311                 return true;
5312         }
5313
5314         /* Get value */
5315         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5316         if (ret != 0) {
5317                 /* Line Ignored but not EOF */
5318                 talloc_free(key->dptr);
5319                 *key = tdb_null;
5320                 return true;
5321         }
5322
5323         return true;
5324 }
5325
5326 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5327                           int argc, const char **argv)
5328 {
5329         const char *db_name;
5330         struct ctdb_db_context *db;
5331         struct ctdb_transaction_handle *h;
5332         uint8_t db_flags;
5333         FILE *file;
5334         TDB_DATA key, value;
5335         int ret;
5336
5337         if (argc < 1 || argc > 2) {
5338                 usage("ptrans");
5339         }
5340
5341         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5342                 return 1;
5343         }
5344
5345         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5346                 fprintf(stderr, "DB %s is not a persistent database\n",
5347                         db_name);
5348                 return 1;
5349         }
5350
5351         if (argc == 2) {
5352                 file = fopen(argv[1], "r");
5353                 if (file == NULL) {
5354                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5355                         return 1;
5356                 }
5357         } else {
5358                 file = stdin;
5359         }
5360
5361         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5362                           db_flags, &db);
5363         if (ret != 0) {
5364                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5365                 goto done;
5366         }
5367
5368         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5369                                      TIMEOUT(), db, false, &h);
5370         if (ret != 0) {
5371                 fprintf(stderr, "Failed to start transaction on db %s\n",
5372                         db_name);
5373                 goto done;
5374         }
5375
5376         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5377                 if (key.dsize != 0) {
5378                         ret = ctdb_transaction_store_record(h, key, value);
5379                         if (ret != 0) {
5380                                 fprintf(stderr, "Failed to store record\n");
5381                                 ctdb_transaction_cancel(h);
5382                                 goto done;
5383                         }
5384                         talloc_free(key.dptr);
5385                         talloc_free(value.dptr);
5386                 }
5387         }
5388
5389         ret = ctdb_transaction_commit(h);
5390         if (ret != 0) {
5391                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5392                         db_name);
5393         }
5394
5395 done:
5396         if (file != stdin) {
5397                 fclose(file);
5398         }
5399         return ret;
5400 }
5401
5402 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5403                           int argc, const char **argv)
5404 {
5405         struct tdb_context *tdb;
5406         TDB_DATA key, data;
5407         struct ctdb_ltdb_header header;
5408         int ret;
5409
5410         if (argc < 2 || argc > 3) {
5411                 usage("tfetch");
5412         }
5413
5414         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5415         if (tdb == NULL) {
5416                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5417                 return 1;
5418         }
5419
5420         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5421         if (ret != 0) {
5422                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5423                 return ret;
5424         }
5425
5426         data = tdb_fetch(tdb, key);
5427         if (data.dptr == NULL) {
5428                 fprintf(stderr, "No record for key %s\n", argv[1]);
5429                 return 1;
5430         }
5431
5432         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5433                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5434                 return 1;
5435         }
5436
5437         tdb_close(tdb);
5438
5439         if (argc == 3) {
5440                 int fd;
5441                 ssize_t nwritten;
5442
5443                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5444                 if (fd == -1) {
5445                         fprintf(stderr, "Failed to open output file %s\n",
5446                                 argv[2]);
5447                         goto fail;
5448                 }
5449
5450                 nwritten = sys_write(fd, data.dptr, data.dsize);
5451                 if (nwritten != data.dsize) {
5452                         fprintf(stderr, "Failed to write record to file\n");
5453                         goto fail;
5454                 }
5455
5456                 close(fd);
5457         }
5458 fail:
5459         ret = ctdb_ltdb_header_extract(&data, &header);
5460         if (ret != 0) {
5461                 fprintf(stderr, "Failed to parse header from data\n");
5462                 return 1;
5463         }
5464
5465         dump_ltdb_header(&header);
5466         dump_tdb_data("data", data);
5467
5468         return 0;
5469 }
5470
5471 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5472                           int argc, const char **argv)
5473 {
5474         struct tdb_context *tdb;
5475         TDB_DATA key, data, value;
5476         struct ctdb_ltdb_header header;
5477         size_t offset;
5478         int ret;
5479
5480         if (argc < 3 || argc > 5) {
5481                 usage("tstore");
5482         }
5483
5484         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5485         if (tdb == NULL) {
5486                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5487                 return 1;
5488         }
5489
5490         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5491         if (ret != 0) {
5492                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5493                 return ret;
5494         }
5495
5496         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5497         if (ret != 0) {
5498                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5499                 return ret;
5500         }
5501
5502         ZERO_STRUCT(header);
5503
5504         if (argc > 3) {
5505                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5506         }
5507         if (argc > 4) {
5508                 header.dmaster = (uint32_t)atol(argv[4]);
5509         }
5510         if (argc > 5) {
5511                 header.flags = (uint32_t)atol(argv[5]);
5512         }
5513
5514         offset = ctdb_ltdb_header_len(&header);
5515         data.dsize = offset + value.dsize;
5516         data.dptr = talloc_size(mem_ctx, data.dsize);
5517         if (data.dptr == NULL) {
5518                 fprintf(stderr, "Memory allocation error\n");
5519                 return 1;
5520         }
5521
5522         ctdb_ltdb_header_push(&header, data.dptr);
5523         memcpy(data.dptr + offset, value.dptr, value.dsize);
5524
5525         ret = tdb_store(tdb, key, data, TDB_REPLACE);
5526         if (ret != 0) {
5527                 fprintf(stderr, "Failed to write record %s to file %s\n",
5528                         argv[1], argv[0]);
5529         }
5530
5531         tdb_close(tdb);
5532
5533         return ret;
5534 }
5535
5536 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5537                            int argc, const char **argv)
5538 {
5539         const char *db_name;
5540         struct ctdb_db_context *db;
5541         struct ctdb_record_handle *h;
5542         uint8_t db_flags;
5543         TDB_DATA key, data;
5544         bool readonly = false;
5545         int ret;
5546
5547         if (argc < 2 || argc > 3) {
5548                 usage("readkey");
5549         }
5550
5551         if (argc == 3) {
5552                 if (strcmp(argv[2], "readonly") == 0) {
5553                         readonly = true;
5554                 } else {
5555                         usage("readkey");
5556                 }
5557         }
5558
5559         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5560                 return 1;
5561         }
5562
5563         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5564                 fprintf(stderr, "DB %s is not a volatile database\n",
5565                         db_name);
5566                 return 1;
5567         }
5568
5569         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5570                           db_flags, &db);
5571         if (ret != 0) {
5572                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5573                 return ret;
5574         }
5575
5576         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5577         if (ret != 0) {
5578                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5579                 return ret;
5580         }
5581
5582         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5583                               db, key, readonly, &h, NULL, &data);
5584         if (ret != 0) {
5585                 fprintf(stderr, "Failed to read record for key %s\n",
5586                         argv[1]);
5587         } else {
5588                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5589                        (int)data.dsize, data.dptr);
5590         }
5591
5592         talloc_free(h);
5593         return ret;
5594 }
5595
5596 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5597                             int argc, const char **argv)
5598 {
5599         const char *db_name;
5600         struct ctdb_db_context *db;
5601         struct ctdb_record_handle *h;
5602         uint8_t db_flags;
5603         TDB_DATA key, data;
5604         int ret;
5605
5606         if (argc != 3) {
5607                 usage("writekey");
5608         }
5609
5610         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5611                 return 1;
5612         }
5613
5614         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5615                 fprintf(stderr, "DB %s is not a volatile database\n",
5616                         db_name);
5617                 return 1;
5618         }
5619
5620         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5621                           db_flags, &db);
5622         if (ret != 0) {
5623                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5624                 return ret;
5625         }
5626
5627         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5628         if (ret != 0) {
5629                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5630                 return ret;
5631         }
5632
5633         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5634         if (ret != 0) {
5635                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5636                 return ret;
5637         }
5638
5639         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5640                               db, key, false, &h, NULL, NULL);
5641         if (ret != 0) {
5642                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5643                 return ret;
5644         }
5645
5646         ret = ctdb_store_record(h, data);
5647         if (ret != 0) {
5648                 fprintf(stderr, "Failed to store record for key %s\n",
5649                         argv[1]);
5650         }
5651
5652         talloc_free(h);
5653         return ret;
5654 }
5655
5656 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5657                              int argc, const char **argv)
5658 {
5659         const char *db_name;
5660         struct ctdb_db_context *db;
5661         struct ctdb_record_handle *h;
5662         uint8_t db_flags;
5663         TDB_DATA key, data;
5664         int ret;
5665
5666         if (argc != 2) {
5667                 usage("deletekey");
5668         }
5669
5670         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5671                 return 1;
5672         }
5673
5674         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5675                 fprintf(stderr, "DB %s is not a volatile database\n",
5676                         db_name);
5677                 return 1;
5678         }
5679
5680         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5681                           db_flags, &db);
5682         if (ret != 0) {
5683                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5684                 return ret;
5685         }
5686
5687         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5688         if (ret != 0) {
5689                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5690                 return ret;
5691         }
5692
5693         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5694                               db, key, false, &h, NULL, &data);
5695         if (ret != 0) {
5696                 fprintf(stderr, "Failed to fetch record for key %s\n",
5697                         argv[1]);
5698                 return ret;
5699         }
5700
5701         ret = ctdb_delete_record(h);
5702         if (ret != 0) {
5703                 fprintf(stderr, "Failed to delete record for key %s\n",
5704                         argv[1]);
5705         }
5706
5707         talloc_free(h);
5708         return ret;
5709 }
5710
5711 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5712                                 int argc, const char **argv)
5713 {
5714         struct sockaddr_in sin;
5715         unsigned int port;
5716         int s, v;
5717         int ret;
5718
5719         if (argc != 1) {
5720                 usage("chktcpport");
5721         }
5722
5723         port = atoi(argv[0]);
5724
5725         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5726         if (s == -1) {
5727                 fprintf(stderr, "Failed to open local socket\n");
5728                 return errno;
5729         }
5730
5731         v = fcntl(s, F_GETFL, 0);
5732         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5733                 fprintf(stderr, "Unable to set socket non-blocking\n");
5734                 return errno;
5735         }
5736
5737         bzero(&sin, sizeof(sin));
5738         sin.sin_family = AF_INET;
5739         sin.sin_port = htons(port);
5740         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5741         close(s);
5742         if (ret == -1) {
5743                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5744                 return errno;
5745         }
5746
5747         return 0;
5748 }
5749
5750 static int control_rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5751                                  int argc, const char **argv)
5752 {
5753         if (argc != 0) {
5754                 usage("rebalancenode");
5755         }
5756
5757         if (! rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn)) {
5758                 fprintf(stderr, "Failed to rebalance IPs on node %u\n",
5759                         ctdb->cmd_pnn);
5760                 return 1;
5761         }
5762
5763         return 0;
5764 }
5765
5766 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5767                                int argc, const char **argv)
5768 {
5769         uint32_t db_id;
5770         const char *db_name;
5771         uint64_t seqnum;
5772         int ret;
5773
5774         if (argc != 1) {
5775                 usage("getdbseqnum");
5776         }
5777
5778         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5779                 return 1;
5780         }
5781
5782         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5783                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5784                                       &seqnum);
5785         if (ret != 0) {
5786                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5787                         db_name);
5788                 return ret;
5789         }
5790
5791         printf("0x%"PRIx64"\n", seqnum);
5792         return 0;
5793 }
5794
5795 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5796                               int argc, const char **argv)
5797 {
5798         const char *nodestring = NULL;
5799         struct ctdb_node_map *nodemap;
5800         int ret, i;
5801
5802         if (argc > 1) {
5803                 usage("nodestatus");
5804         }
5805
5806         if (argc == 1) {
5807                 nodestring = argv[0];
5808         }
5809
5810         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5811                 return 1;
5812         }
5813
5814         nodemap = get_nodemap(ctdb, false);
5815         if (nodemap == NULL) {
5816                 return 1;
5817         }
5818
5819         if (options.machinereadable) {
5820                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5821         } else {
5822                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5823         }
5824
5825         ret = 0;
5826         for (i=0; i<nodemap->num; i++) {
5827                 ret |= nodemap->node[i].flags;
5828         }
5829
5830         return ret;
5831 }
5832
5833 const struct {
5834         const char *name;
5835         uint32_t offset;
5836 } db_stats_fields[] = {
5837 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5838         DBSTATISTICS_FIELD(db_ro_delegations),
5839         DBSTATISTICS_FIELD(db_ro_revokes),
5840         DBSTATISTICS_FIELD(locks.num_calls),
5841         DBSTATISTICS_FIELD(locks.num_current),
5842         DBSTATISTICS_FIELD(locks.num_pending),
5843         DBSTATISTICS_FIELD(locks.num_failed),
5844         DBSTATISTICS_FIELD(db_ro_delegations),
5845 };
5846
5847 static void print_dbstatistics(const char *db_name,
5848                                struct ctdb_db_statistics *s)
5849 {
5850         int i;
5851         const char *prefix = NULL;
5852         int preflen = 0;
5853
5854         printf("DB Statistics %s\n", db_name);
5855
5856         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5857                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5858                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5859                         if (! prefix ||
5860                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5861                                 prefix = db_stats_fields[i].name;
5862                                 printf(" %*.*s\n", preflen-1, preflen-1,
5863                                        db_stats_fields[i].name);
5864                         }
5865                 } else {
5866                         preflen = 0;
5867                 }
5868                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5869                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5870                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5871         }
5872
5873         printf(" hop_count_buckets:");
5874         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5875                 printf(" %d", s->hop_count_bucket[i]);
5876         }
5877         printf("\n");
5878
5879         printf(" lock_buckets:");
5880         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5881                 printf(" %d", s->locks.buckets[i]);
5882         }
5883         printf("\n");
5884
5885         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5886                "locks_latency      MIN/AVG/MAX",
5887                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5888                s->locks.latency.max, s->locks.latency.num);
5889
5890         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5891                "vacuum_latency     MIN/AVG/MAX",
5892                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5893                s->vacuum.latency.max, s->vacuum.latency.num);
5894
5895         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5896         for (i=0; i<s->num_hot_keys; i++) {
5897                 int j;
5898                 printf("     Count:%d Key:", s->hot_keys[i].count);
5899                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5900                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5901                 }
5902                 printf("\n");
5903         }
5904 }
5905
5906 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5907                                 int argc, const char **argv)
5908 {
5909         uint32_t db_id;
5910         const char *db_name;
5911         struct ctdb_db_statistics *dbstats;
5912         int ret;
5913
5914         if (argc != 1) {
5915                 usage("dbstatistics");
5916         }
5917
5918         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5919                 return 1;
5920         }
5921
5922         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5923                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5924                                           &dbstats);
5925         if (ret != 0) {
5926                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5927                         db_name);
5928                 return ret;
5929         }
5930
5931         print_dbstatistics(db_name, dbstats);
5932         return 0;
5933 }
5934
5935 struct disable_takeover_runs_state {
5936         uint32_t *pnn_list;
5937         int node_count;
5938         bool *reply;
5939         int status;
5940         bool done;
5941 };
5942
5943 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5944                                          void *private_data)
5945 {
5946         struct disable_takeover_runs_state *state =
5947                 (struct disable_takeover_runs_state *)private_data;
5948         int ret, i;
5949
5950         if (data.dsize != sizeof(int)) {
5951                 /* Ignore packet */
5952                 return;
5953         }
5954
5955         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5956         ret = *(int *)data.dptr;
5957         if (ret < 0) {
5958                 state->status = ret;
5959                 state->done = true;
5960                 return;
5961         }
5962         for (i=0; i<state->node_count; i++) {
5963                 if (state->pnn_list[i] == ret) {
5964                         state->reply[i] = true;
5965                         break;
5966                 }
5967         }
5968
5969         state->done = true;
5970         for (i=0; i<state->node_count; i++) {
5971                 if (! state->reply[i]) {
5972                         state->done = false;
5973                         break;
5974                 }
5975         }
5976 }
5977
5978 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5979                                  struct ctdb_context *ctdb, uint32_t timeout,
5980                                  uint32_t *pnn_list, int count)
5981 {
5982         struct ctdb_disable_message disable = { 0 };
5983         struct disable_takeover_runs_state state;
5984         int ret, i;
5985
5986         disable.pnn = ctdb->pnn;
5987         disable.srvid = next_srvid(ctdb);
5988         disable.timeout = timeout;
5989
5990         state.pnn_list = pnn_list;
5991         state.node_count = count;
5992         state.done = false;
5993         state.status = 0;
5994         state.reply = talloc_zero_array(mem_ctx, bool, count);
5995         if (state.reply == NULL) {
5996                 return ENOMEM;
5997         }
5998
5999         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
6000                                               disable.srvid,
6001                                               disable_takeover_run_handler,
6002                                               &state);
6003         if (ret != 0) {
6004                 return ret;
6005         }
6006
6007         for (i=0; i<count; i++) {
6008                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
6009                                                          ctdb->client,
6010                                                          pnn_list[i],
6011                                                          &disable);
6012                 if (ret != 0) {
6013                         goto fail;
6014                 }
6015         }
6016
6017         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
6018         if (ret == ETIME) {
6019                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
6020         } else {
6021                 ret = (state.status >= 0 ? 0 : 1);
6022         }
6023
6024 fail:
6025         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
6026                                            disable.srvid, &state);
6027         return ret;
6028 }
6029
6030 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6031                              int argc, const char **argv)
6032 {
6033         const char *nodestring = NULL;
6034         struct ctdb_node_map *nodemap, *nodemap2;
6035         struct ctdb_req_control request;
6036         uint32_t *pnn_list, *pnn_list2;
6037         int ret, count, count2;
6038
6039         if (argc > 1) {
6040                 usage("reloadips");
6041         }
6042
6043         if (argc == 1) {
6044                 nodestring = argv[0];
6045         }
6046
6047         nodemap = get_nodemap(ctdb, false);
6048         if (nodemap == NULL) {
6049                 return 1;
6050         }
6051
6052         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6053                 return 1;
6054         }
6055
6056         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6057                                         mem_ctx, &pnn_list);
6058         if (count <= 0) {
6059                 fprintf(stderr, "Memory allocation error\n");
6060                 return 1;
6061         }
6062
6063         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6064                                       mem_ctx, &pnn_list2);
6065         if (count2 <= 0) {
6066                 fprintf(stderr, "Memory allocation error\n");
6067                 return 1;
6068         }
6069
6070         /* Disable takeover runs on all connected nodes.  A reply
6071          * indicating success is needed from each node so all nodes
6072          * will need to be active.
6073          *
6074          * A check could be added to not allow reloading of IPs when
6075          * there are disconnected nodes.  However, this should
6076          * probably be left up to the administrator.
6077          */
6078         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6079                                     pnn_list, count);
6080         if (ret != 0) {
6081                 fprintf(stderr, "Failed to disable takeover runs\n");
6082                 return ret;
6083         }
6084
6085         /* Now tell all the desired nodes to reload their public IPs.
6086          * Keep trying this until it succeeds.  This assumes all
6087          * failures are transient, which might not be true...
6088          */
6089         ctdb_req_control_reload_public_ips(&request);
6090         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6091                                         pnn_list2, count2, TIMEOUT(),
6092                                         &request, NULL, NULL);
6093         if (ret != 0) {
6094                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6095         }
6096
6097         /* It isn't strictly necessary to wait until takeover runs are
6098          * re-enabled but doing so can't hurt.
6099          */
6100         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6101         if (ret != 0) {
6102                 fprintf(stderr, "Failed to enable takeover runs\n");
6103                 return ret;
6104         }
6105
6106         return ipreallocate(mem_ctx, ctdb);
6107 }
6108
6109 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6110                            int argc, const char **argv)
6111 {
6112         ctdb_sock_addr addr;
6113         char *iface;
6114
6115         if (argc != 1) {
6116                 usage("ipiface");
6117         }
6118
6119         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6120                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6121                 return 1;
6122         }
6123
6124         iface = ctdb_sys_find_ifname(&addr);
6125         if (iface == NULL) {
6126                 fprintf(stderr, "Failed to find interface for IP %s\n",
6127                         argv[0]);
6128                 return 1;
6129         }
6130         free(iface);
6131
6132         return 0;
6133 }
6134
6135
6136 static const struct ctdb_cmd {
6137         const char *name;
6138         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6139         bool without_daemon; /* can be run without daemon running ? */
6140         bool remote; /* can be run on remote nodes */
6141         const char *msg;
6142         const char *args;
6143 } ctdb_commands[] = {
6144         { "version", control_version, true, false,
6145                 "show version of ctdb", NULL },
6146         { "status", control_status, false, true,
6147                 "show node status", NULL },
6148         { "uptime", control_uptime, false, true,
6149                 "show node uptime", NULL },
6150         { "ping", control_ping, false, true,
6151                 "ping all nodes", NULL },
6152         { "runstate", control_runstate, false, true,
6153                 "get/check runstate of a node",
6154                 "[setup|first_recovery|startup|running]" },
6155         { "getvar", control_getvar, false, true,
6156                 "get a tunable variable", "<name>" },
6157         { "setvar", control_setvar, false, true,
6158                 "set a tunable variable", "<name> <value>" },
6159         { "listvars", control_listvars, false, true,
6160                 "list tunable variables", NULL },
6161         { "statistics", control_statistics, false, true,
6162                 "show ctdb statistics", NULL },
6163         { "statisticsreset", control_statistics_reset, false, true,
6164                 "reset ctdb statistics", NULL },
6165         { "stats", control_stats, false, true,
6166                 "show rolling statistics", "[count]" },
6167         { "ip", control_ip, false, true,
6168                 "show public ips", "[all]" },
6169         { "ipinfo", control_ipinfo, false, true,
6170                 "show public ip details", "<ip>" },
6171         { "ifaces", control_ifaces, false, true,
6172                 "show interfaces", NULL },
6173         { "setifacelink", control_setifacelink, false, true,
6174                 "set interface link status", "<iface> up|down" },
6175         { "process-exists", control_process_exists, false, true,
6176                 "check if a process exists on a node",  "<pid>" },
6177         { "getdbmap", control_getdbmap, false, true,
6178                 "show attached databases", NULL },
6179         { "getdbstatus", control_getdbstatus, false, true,
6180                 "show database status", "<dbname|dbid>" },
6181         { "catdb", control_catdb, false, false,
6182                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6183         { "cattdb", control_cattdb, false, false,
6184                 "dump local ctdb database", "<dbname|dbid>" },
6185         { "getmonmode", control_getmonmode, false, true,
6186                 "show monitoring mode", NULL },
6187         { "getcapabilities", control_getcapabilities, false, true,
6188                 "show node capabilities", NULL },
6189         { "pnn", control_pnn, false, false,
6190                 "show the pnn of the currnet node", NULL },
6191         { "lvs", control_lvs, false, false,
6192                 "show lvs configuration", "master|list|status" },
6193         { "disablemonitor", control_disable_monitor, false, true,
6194                 "disable monitoring", NULL },
6195         { "enablemonitor", control_enable_monitor, false, true,
6196                 "enable monitoring", NULL },
6197         { "setdebug", control_setdebug, false, true,
6198                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6199         { "getdebug", control_getdebug, false, true,
6200                 "get debug level", NULL },
6201         { "attach", control_attach, false, false,
6202                 "attach a database", "<dbname> [persistent]" },
6203         { "detach", control_detach, false, false,
6204                 "detach database(s)", "<dbname|dbid> ..." },
6205         { "dumpmemory", control_dumpmemory, false, true,
6206                 "dump ctdbd memory map", NULL },
6207         { "rddumpmemory", control_rddumpmemory, false, true,
6208                 "dump recoverd memory map", NULL },
6209         { "getpid", control_getpid, false, true,
6210                 "get ctdbd process ID", NULL },
6211         { "disable", control_disable, false, true,
6212                 "disable a node", NULL },
6213         { "enable", control_enable, false, true,
6214                 "enable a node", NULL },
6215         { "stop", control_stop, false, true,
6216                 "stop a node", NULL },
6217         { "continue", control_continue, false, true,
6218                 "continue a stopped node", NULL },
6219         { "ban", control_ban, false, true,
6220                 "ban a node", "<bantime>"},
6221         { "unban", control_unban, false, true,
6222                 "unban a node", NULL },
6223         { "shutdown", control_shutdown, false, true,
6224                 "shutdown ctdb daemon", NULL },
6225         { "recover", control_recover, false, true,
6226                 "force recovery", NULL },
6227         { "sync", control_ipreallocate, false, true,
6228                 "run ip reallocation (deprecated)", NULL },
6229         { "ipreallocate", control_ipreallocate, false, true,
6230                 "run ip reallocation", NULL },
6231         { "isnotrecmaster", control_isnotrecmaster, false, false,
6232                 "check if local node is the recmaster", NULL },
6233         { "gratarp", control_gratarp, false, true,
6234                 "send a gratuitous arp", "<ip> <interface>" },
6235         { "tickle", control_tickle, false, false,
6236                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6237         { "gettickles", control_gettickles, false, true,
6238                 "get the list of tickles", "<ip> [<port>]" },
6239         { "addtickle", control_addtickle, false, true,
6240                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6241         { "deltickle", control_deltickle, false, true,
6242                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6243         { "check_srvids", control_check_srvids, false, true,
6244                 "check if srvid is registered", "<id> [<id> ...]" },
6245         { "listnodes", control_listnodes, true, true,
6246                 "list nodes in the cluster", NULL },
6247         { "reloadnodes", control_reloadnodes, false, false,
6248                 "reload the nodes file all nodes", NULL },
6249         { "moveip", control_moveip, false, false,
6250                 "move an ip address to another node", "<ip> <node>" },
6251         { "rebalanceip", control_rebalanceip, false, false,
6252                 "move an ip address optimally to another node", "<ip>" },
6253         { "addip", control_addip, false, true,
6254                 "add an ip address to a node", "<ip/mask> <iface>" },
6255         { "delip", control_delip, false, true,
6256                 "delete an ip address from a node", "<ip>" },
6257         { "eventscript", control_eventscript, false, true,
6258                 "run an event", "monitor" },
6259         { "backupdb", control_backupdb, false, false,
6260                 "backup a database into a file", "<dbname|dbid> <file>" },
6261         { "restoredb", control_restoredb, false, false,
6262                 "restore a database from a file", "<file> [dbname]" },
6263         { "dumpdbbackup", control_dumpdbbackup, true, false,
6264                 "dump database from a backup file", "<file>" },
6265         { "wipedb", control_wipedb, false, false,
6266                 "wipe the contents of a database.", "<dbname|dbid>"},
6267         { "recmaster", control_recmaster, false, true,
6268                 "show the pnn for the recovery master", NULL },
6269         { "scriptstatus", control_scriptstatus, false, true,
6270                 "show event script status",
6271                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6272         { "enablescript", control_enablescript, false, true,
6273                 "enable an eventscript", "<script>"},
6274         { "disablescript", control_disablescript, false, true,
6275                 "disable an eventscript", "<script>"},
6276         { "natgw", control_natgw, false, false,
6277                 "show natgw configuration", "master|list|status" },
6278         { "natgwlist", control_natgwlist, false, false,
6279                 "show the nodes belonging to this natgw configuration", NULL },
6280         { "getreclock", control_getreclock, false, true,
6281                 "get recovery lock file", NULL },
6282         { "setlmasterrole", control_setlmasterrole, false, true,
6283                 "set LMASTER role", "on|off" },
6284         { "setrecmasterrole", control_setrecmasterrole, false, true,
6285                 "set RECMASTER role", "on|off"},
6286         { "setdbreadonly", control_setdbreadonly, false, true,
6287                 "enable readonly records", "<dbname|dbid>" },
6288         { "setdbsticky", control_setdbsticky, false, true,
6289                 "enable sticky records", "<dbname|dbid>"},
6290         { "pfetch", control_pfetch, false, false,
6291                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6292         { "pstore", control_pstore, false, false,
6293                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6294         { "pdelete", control_pdelete, false, false,
6295                 "delete record from persistent database", "<dbname|dbid> <key>" },
6296         { "ptrans", control_ptrans, false, false,
6297                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6298         { "tfetch", control_tfetch, false, true,
6299                 "fetch a record", "<tdb-file> <key> [<file>]" },
6300         { "tstore", control_tstore, false, true,
6301                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6302         { "readkey", control_readkey, false, false,
6303                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6304         { "writekey", control_writekey, false, false,
6305                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6306         { "deletekey", control_deletekey, false, false,
6307                 "delete a database key", "<dbname|dbid> <key>" },
6308         { "checktcpport", control_checktcpport, true, false,
6309                 "check if a service is bound to a specific tcp port or not", "<port>" },
6310         { "rebalancenode", control_rebalancenode, false, true,
6311                 "mark nodes as forced IP rebalancing targets", NULL },
6312         { "getdbseqnum", control_getdbseqnum, false, false,
6313                 "get database sequence number", "<dbname|dbid>" },
6314         { "nodestatus", control_nodestatus, false, true,
6315                 "show and return node status", "[all|<pnn-list>]" },
6316         { "dbstatistics", control_dbstatistics, false, true,
6317                 "show database statistics", "<dbname|dbid>" },
6318         { "reloadips", control_reloadips, false, false,
6319                 "reload the public addresses file", "[all|<pnn-list>]" },
6320         { "ipiface", control_ipiface, true, false,
6321                 "Find the interface an ip address is hosted on", "<ip>" },
6322 };
6323
6324 static const struct ctdb_cmd *match_command(const char *command)
6325 {
6326         const struct ctdb_cmd *cmd;
6327         int i;
6328
6329         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6330                 cmd = &ctdb_commands[i];
6331                 if (strlen(command) == strlen(cmd->name) &&
6332                     strncmp(command, cmd->name, strlen(command)) == 0) {
6333                         return cmd;
6334                 }
6335         }
6336
6337         return NULL;
6338 }
6339
6340
6341 /**
6342  * Show usage message
6343  */
6344 static void usage_full(void)
6345 {
6346         int i;
6347
6348         poptPrintHelp(pc, stdout, 0);
6349         printf("\nCommands:\n");
6350         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6351                 printf("  %-15s %-27s  %s\n",
6352                        ctdb_commands[i].name,
6353                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6354                        ctdb_commands[i].msg);
6355         }
6356 }
6357
6358 static void usage(const char *command)
6359 {
6360         const struct ctdb_cmd *cmd;
6361
6362         if (command == NULL) {
6363                 usage_full();
6364                 exit(1);
6365         }
6366
6367         cmd = match_command(command);
6368         if (cmd == NULL) {
6369                 usage_full();
6370         } else {
6371                 poptPrintUsage(pc, stdout, 0);
6372                 printf("\nCommands:\n");
6373                 printf("  %-15s %-27s  %s\n",
6374                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6375         }
6376
6377         exit(1);
6378 }
6379
6380 struct poptOption cmdline_options[] = {
6381         POPT_AUTOHELP
6382         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6383                 "CTDB socket path", "filename" },
6384         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6385                 "debug level"},
6386         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6387                 "timelimit (in seconds)" },
6388         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6389                 "node specification - integer" },
6390         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6391                 "enable machine readable output", NULL },
6392         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6393                 "specify separator for machine readable output", "CHAR" },
6394         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6395                 "enable machine parsable output with separator |", NULL },
6396         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6397                 "enable verbose output", NULL },
6398         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6399                 "die if runtime exceeds this limit (in seconds)" },
6400         POPT_TABLEEND
6401 };
6402
6403 static int process_command(const struct ctdb_cmd *cmd, int argc,
6404                            const char **argv)
6405 {
6406         TALLOC_CTX *tmp_ctx;
6407         struct ctdb_context *ctdb;
6408         int ret;
6409         bool status;
6410         uint64_t srvid_offset;
6411
6412         tmp_ctx = talloc_new(NULL);
6413         if (tmp_ctx == NULL) {
6414                 fprintf(stderr, "Memory allocation error\n");
6415                 goto fail;
6416         }
6417
6418         if (cmd->without_daemon) {
6419                 if (options.pnn != -1) {
6420                         fprintf(stderr,
6421                                 "Cannot specify node for command %s\n",
6422                                 cmd->name);
6423                         goto fail;
6424                 }
6425
6426                 return cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6427         }
6428
6429         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6430         if (ctdb == NULL) {
6431                 fprintf(stderr, "Memory allocation error\n");
6432                 goto fail;
6433         }
6434
6435         ctdb->ev = tevent_context_init(ctdb);
6436         if (ctdb->ev == NULL) {
6437                 fprintf(stderr, "Failed to initialize tevent\n");
6438                 goto fail;
6439         }
6440
6441         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6442         if (ret != 0) {
6443                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6444                         options.socket);
6445
6446                 if (!find_node_xpnn(ctdb, NULL)) {
6447                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6448                 }
6449                 goto fail;
6450         }
6451
6452         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6453         srvid_offset = getpid() & 0xFFFF;
6454         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6455
6456         if (options.pnn != -1) {
6457                 status = verify_pnn(ctdb, options.pnn);
6458                 if (! status) {
6459                         goto fail;
6460                 }
6461
6462                 ctdb->cmd_pnn = options.pnn;
6463         } else {
6464                 ctdb->cmd_pnn = ctdb->pnn;
6465         }
6466
6467         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6468                 fprintf(stderr, "Node cannot be specified for command %s\n",
6469                         cmd->name);
6470                 goto fail;
6471         }
6472
6473         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6474         talloc_free(tmp_ctx);
6475         return ret;
6476
6477 fail:
6478         talloc_free(tmp_ctx);
6479         return 1;
6480 }
6481
6482 static void signal_handler(int sig)
6483 {
6484         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6485 }
6486
6487 static void alarm_handler(int sig)
6488 {
6489         /* Kill any child processes */
6490         signal(SIGTERM, signal_handler);
6491         kill(0, SIGTERM);
6492
6493         _exit(1);
6494 }
6495
6496 int main(int argc, const char *argv[])
6497 {
6498         int opt;
6499         const char **extra_argv;
6500         int extra_argc;
6501         const struct ctdb_cmd *cmd;
6502         const char *ctdb_socket;
6503         enum debug_level loglevel;
6504         int ret;
6505
6506         setlinebuf(stdout);
6507
6508         /* Set default options */
6509         options.socket = CTDB_SOCKET;
6510         options.debuglevelstr = NULL;
6511         options.timelimit = 10;
6512         options.sep = "|";
6513         options.maxruntime = 0;
6514         options.pnn = -1;
6515
6516         ctdb_socket = getenv("CTDB_SOCKET");
6517         if (ctdb_socket != NULL) {
6518                 options.socket = ctdb_socket;
6519         }
6520
6521         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6522                             POPT_CONTEXT_KEEP_FIRST);
6523         while ((opt = poptGetNextOpt(pc)) != -1) {
6524                 fprintf(stderr, "Invalid option %s: %s\n",
6525                         poptBadOption(pc, 0), poptStrerror(opt));
6526                 exit(1);
6527         }
6528
6529         if (options.maxruntime == 0) {
6530                 const char *ctdb_timeout;
6531
6532                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6533                 if (ctdb_timeout != NULL) {
6534                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6535                 } else {
6536                         options.maxruntime = 120;
6537                 }
6538         }
6539         if (options.maxruntime <= 120) {
6540                 /* default timeout is 120 seconds */
6541                 options.maxruntime = 120;
6542         }
6543
6544         if (options.machineparsable) {
6545                 options.machinereadable = 1;
6546         }
6547
6548         /* setup the remaining options for the commands */
6549         extra_argc = 0;
6550         extra_argv = poptGetArgs(pc);
6551         if (extra_argv) {
6552                 extra_argv++;
6553                 while (extra_argv[extra_argc]) extra_argc++;
6554         }
6555
6556         if (extra_argc < 1) {
6557                 usage(NULL);
6558         }
6559
6560         cmd = match_command(extra_argv[0]);
6561         if (cmd == NULL) {
6562                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6563                 exit(1);
6564         }
6565
6566         /* Enable logging */
6567         setup_logging("ctdb", DEBUG_STDERR);
6568         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6569                 DEBUGLEVEL = loglevel;
6570         } else {
6571                 DEBUGLEVEL = DEBUG_ERR;
6572         }
6573
6574         signal(SIGALRM, alarm_handler);
6575         alarm(options.maxruntime);
6576
6577         ret = process_command(cmd, extra_argc, extra_argv);
6578         if (ret == -1) {
6579                 ret = 1;
6580         }
6581
6582         (void)poptFreeContext(pc);
6583
6584         return ret;
6585 }