ctdb-tools: Drop "ctdb rebalancenode"
[vlendec/samba-autobuild/.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25
26 #include <ctype.h>
27 #include <popt.h>
28 #include <talloc.h>
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "ctdb_version.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "common/db_hash.h"
37 #include "common/logging.h"
38 #include "protocol/protocol.h"
39 #include "protocol/protocol_api.h"
40 #include "common/system.h"
41 #include "client/client.h"
42
43 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
44
45 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
46 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
47
48 static struct {
49         const char *socket;
50         const char *debuglevelstr;
51         int timelimit;
52         int pnn;
53         int machinereadable;
54         const char *sep;
55         int machineparsable;
56         int verbose;
57         int maxruntime;
58         int printemptyrecords;
59         int printdatasize;
60         int printlmaster;
61         int printhash;
62         int printrecordflags;
63 } options;
64
65 static poptContext pc;
66
67 struct ctdb_context {
68         struct tevent_context *ev;
69         struct ctdb_client_context *client;
70         struct ctdb_node_map *nodemap;
71         uint32_t pnn, cmd_pnn;
72         uint64_t srvid;
73 };
74
75 static void usage(const char *command);
76
77 /*
78  * Utility Functions
79  */
80
81 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
82 {
83         return (tv2->tv_sec - tv->tv_sec) +
84                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
85 }
86
87 static struct ctdb_node_and_flags *get_node_by_pnn(
88                                         struct ctdb_node_map *nodemap,
89                                         uint32_t pnn)
90 {
91         int i;
92
93         for (i=0; i<nodemap->num; i++) {
94                 if (nodemap->node[i].pnn == pnn) {
95                         return &nodemap->node[i];
96                 }
97         }
98         return NULL;
99 }
100
101 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
102 {
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         char *flags_str = NULL;
116         int i;
117
118         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
119                 if (flags & flag_names[i].flag) {
120                         if (flags_str == NULL) {
121                                 flags_str = talloc_asprintf(mem_ctx,
122                                                 "%s", flag_names[i].name);
123                         } else {
124                                 flags_str = talloc_asprintf_append(flags_str,
125                                                 "|%s", flag_names[i].name);
126                         }
127                         if (flags_str == NULL) {
128                                 return "OUT-OF-MEMORY";
129                         }
130                 }
131         }
132         if (flags_str == NULL) {
133                 return "OK";
134         }
135
136         return flags_str;
137 }
138
139 static uint64_t next_srvid(struct ctdb_context *ctdb)
140 {
141         ctdb->srvid += 1;
142         return ctdb->srvid;
143 }
144
145 /*
146  * Get consistent nodemap information.
147  *
148  * If nodemap is already cached, use that. If not get it.
149  * If the current node is BANNED, then get nodemap from "better" node.
150  */
151 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
152 {
153         TALLOC_CTX *tmp_ctx;
154         struct ctdb_node_map *nodemap;
155         struct ctdb_node_and_flags *node;
156         uint32_t current_node;
157         int ret;
158
159         if (force) {
160                 TALLOC_FREE(ctdb->nodemap);
161         }
162
163         if (ctdb->nodemap != NULL) {
164                 return ctdb->nodemap;
165         }
166
167         tmp_ctx = talloc_new(ctdb);
168         if (tmp_ctx == NULL) {
169                 return false;
170         }
171
172         current_node = ctdb->pnn;
173 again:
174         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
175                                     current_node, TIMEOUT(), &nodemap);
176         if (ret != 0) {
177                 fprintf(stderr, "Failed to get nodemap from node %u\n",
178                         current_node);
179                 goto failed;
180         }
181
182         node = get_node_by_pnn(nodemap, current_node);
183         if (node->flags & NODE_FLAGS_BANNED) {
184                 /* Pick next node */
185                 do {
186                         current_node = (current_node + 1) % nodemap->num;
187                         node = get_node_by_pnn(nodemap, current_node);
188                         if (! (node->flags &
189                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
190                                 break;
191                         }
192                 } while (current_node != ctdb->pnn);
193
194                 if (current_node == ctdb->pnn) {
195                         /* Tried all nodes in the cluster */
196                         fprintf(stderr, "Warning: All nodes are banned.\n");
197                         goto failed;
198                 }
199
200                 goto again;
201         }
202
203         ctdb->nodemap = talloc_steal(ctdb, nodemap);
204         return nodemap;
205
206 failed:
207         talloc_free(tmp_ctx);
208         return NULL;
209 }
210
211 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
212 {
213         struct ctdb_node_map *nodemap;
214         bool found;
215         int i;
216
217         if (pnn == -1) {
218                 return false;
219         }
220
221         nodemap = get_nodemap(ctdb, false);
222         if (nodemap == NULL) {
223                 return false;
224         }
225
226         found = false;
227         for (i=0; i<nodemap->num; i++) {
228                 if (nodemap->node[i].pnn == pnn) {
229                         found = true;
230                         break;
231                 }
232         }
233         if (! found) {
234                 fprintf(stderr, "Node %u does not exist\n", pnn);
235                 return false;
236         }
237
238         if (nodemap->node[i].flags &
239             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
240                 fprintf(stderr, "Node %u has status %s\n", pnn,
241                         pretty_print_flags(ctdb, nodemap->node[i].flags));
242                 return false;
243         }
244
245         return true;
246 }
247
248 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
249                                             struct ctdb_node_map *nodemap)
250 {
251         struct ctdb_node_map *nodemap2;
252
253         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
254         if (nodemap2 == NULL) {
255                 return NULL;
256         }
257
258         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
259                                       nodemap->num);
260         if (nodemap2->node == NULL) {
261                 talloc_free(nodemap2);
262                 return NULL;
263         }
264
265         return nodemap2;
266 }
267
268 /*
269  * Get the number and the list of matching nodes
270  *
271  *   nodestring :=  NULL | all | pnn,[pnn,...]
272  *
273  * If nodestring is NULL, use the current node.
274  */
275 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
276                              const char *nodestring,
277                              struct ctdb_node_map **out)
278 {
279         struct ctdb_node_map *nodemap, *nodemap2;
280         struct ctdb_node_and_flags *node;
281         int i;
282
283         nodemap = get_nodemap(ctdb, false);
284         if (nodemap == NULL) {
285                 return false;
286         }
287
288         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
289         if (nodemap2 == NULL) {
290                 return false;
291         }
292
293         if (nodestring == NULL) {
294                 for (i=0; i<nodemap->num; i++) {
295                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
296                                 nodemap2->node[0] = nodemap->node[i];
297                                 break;
298                         }
299                 }
300                 nodemap2->num = 1;
301
302                 goto done;
303         }
304
305         if (strcmp(nodestring, "all") == 0) {
306                 for (i=0; i<nodemap->num; i++) {
307                         nodemap2->node[i] = nodemap->node[i];
308                 }
309                 nodemap2->num = nodemap->num;
310
311                 goto done;
312         } else {
313                 char *ns, *tok;
314
315                 ns = talloc_strdup(mem_ctx, nodestring);
316                 if (ns == NULL) {
317                         return false;
318                 }
319
320                 tok = strtok(ns, ",");
321                 while (tok != NULL) {
322                         uint32_t pnn;
323                         char *endptr;
324
325                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
326                         if (pnn == 0 && tok == endptr) {
327                                 fprintf(stderr, "Invalid node %s\n", tok);
328                                         return false;
329                         }
330
331                         node = get_node_by_pnn(nodemap, pnn);
332                         if (node == NULL) {
333                                 fprintf(stderr, "Node %u does not exist\n",
334                                         pnn);
335                                 return false;
336                         }
337
338                         nodemap2->node[nodemap2->num] = *node;
339                         nodemap2->num += 1;
340
341                         tok = strtok(NULL, ",");
342                 }
343         }
344
345 done:
346         *out = nodemap2;
347         return true;
348 }
349
350 /* Compare IP address */
351 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
352 {
353         bool ret = false;
354
355         if (ip1->sa.sa_family != ip2->sa.sa_family) {
356                 return false;
357         }
358
359         switch (ip1->sa.sa_family) {
360         case AF_INET:
361                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
362                               sizeof(struct in_addr)) == 0);
363                 break;
364
365         case AF_INET6:
366                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
367                               sizeof(struct in6_addr)) == 0);
368                 break;
369         }
370
371         return ret;
372 }
373
374 /* Append a node to a node map with given address and flags */
375 static bool node_map_add(struct ctdb_node_map *nodemap,
376                          const char *nstr, uint32_t flags)
377 {
378         ctdb_sock_addr addr;
379         uint32_t num;
380         struct ctdb_node_and_flags *n;
381
382         if (! parse_ip(nstr, NULL, 0, &addr)) {
383                 fprintf(stderr, "Invalid IP address %s\n", nstr);
384                 return false;
385         }
386
387         num = nodemap->num;
388         nodemap->node = talloc_realloc(nodemap, nodemap->node,
389                                        struct ctdb_node_and_flags, num+1);
390         if (nodemap->node == NULL) {
391                 return false;
392         }
393
394         n = &nodemap->node[num];
395         n->addr = addr;
396         n->pnn = num;
397         n->flags = flags;
398
399         nodemap->num = num+1;
400         return true;
401 }
402
403 /* Read a nodes file into a node map */
404 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
405                                                   const char *nlist)
406 {
407         char **lines;
408         int nlines;
409         int i;
410         struct ctdb_node_map *nodemap;
411
412         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
413         if (nodemap == NULL) {
414                 return NULL;
415         }
416
417         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
418         if (lines == NULL) {
419                 return NULL;
420         }
421
422         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
423                 nlines--;
424         }
425
426         for (i=0; i<nlines; i++) {
427                 char *node;
428                 uint32_t flags;
429                 size_t len;
430
431                 node = lines[i];
432                 /* strip leading spaces */
433                 while((*node == ' ') || (*node == '\t')) {
434                         node++;
435                 }
436
437                 len = strlen(node);
438
439                 /* strip trailing spaces */
440                 while ((len > 1) &&
441                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
442                 {
443                         node[len-1] = '\0';
444                         len--;
445                 }
446
447                 if (len == 0) {
448                         continue;
449                 }
450                 if (*node == '#') {
451                         /* A "deleted" node is a node that is
452                            commented out in the nodes file.  This is
453                            used instead of removing a line, which
454                            would cause subsequent nodes to change
455                            their PNN. */
456                         flags = NODE_FLAGS_DELETED;
457                         node = discard_const("0.0.0.0");
458                 } else {
459                         flags = 0;
460                 }
461                 if (! node_map_add(nodemap, node, flags)) {
462                         talloc_free(lines);
463                         TALLOC_FREE(nodemap);
464                         return NULL;
465                 }
466         }
467
468         talloc_free(lines);
469         return nodemap;
470 }
471
472 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
473 {
474         struct ctdb_node_map *nodemap;
475         char *nodepath;
476         const char *nodes_list = NULL;
477
478         if (pnn != CTDB_UNKNOWN_PNN) {
479                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
480                 if (nodepath != NULL) {
481                         nodes_list = getenv(nodepath);
482                 }
483         }
484         if (nodes_list == NULL) {
485                 nodes_list = getenv("CTDB_NODES");
486         }
487         if (nodes_list == NULL) {
488                 const char *basedir = getenv("CTDB_BASE");
489                 if (basedir == NULL) {
490                         basedir = CTDB_ETCDIR;
491                 }
492                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
493                 if (nodes_list == NULL) {
494                         fprintf(stderr, "Memory allocation error\n");
495                         return NULL;
496                 }
497         }
498
499         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
500         if (nodemap == NULL) {
501                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
502                         nodes_list);
503                 return NULL;
504         }
505
506         return nodemap;
507 }
508
509 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
510                                  struct ctdb_context *ctdb,
511                                  struct ctdb_dbid_map *dbmap,
512                                  const char *db_name)
513 {
514         struct ctdb_dbid *db = NULL;
515         const char *name;
516         int ret, i;
517
518         for (i=0; i<dbmap->num; i++) {
519                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
520                                            ctdb->pnn, TIMEOUT(),
521                                            dbmap->dbs[i].db_id, &name);
522                 if (ret != 0) {
523                         return false;
524                 }
525
526                 if (strcmp(db_name, name) == 0) {
527                         talloc_free(discard_const(name));
528                         db = &dbmap->dbs[i];
529                         break;
530                 }
531         }
532
533         return db;
534 }
535
536 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
537                       const char *db_arg, uint32_t *db_id,
538                       const char **db_name, uint8_t *db_flags)
539 {
540         struct ctdb_dbid_map *dbmap;
541         struct ctdb_dbid *db = NULL;
542         uint32_t id = 0;
543         const char *name = NULL;
544         int ret, i;
545
546         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
547                                   ctdb->pnn, TIMEOUT(), &dbmap);
548         if (ret != 0) {
549                 return false;
550         }
551
552         if (strncmp(db_arg, "0x", 2) == 0) {
553                 id = strtoul(db_arg, NULL, 0);
554                 for (i=0; i<dbmap->num; i++) {
555                         if (id == dbmap->dbs[i].db_id) {
556                                 db = &dbmap->dbs[i];
557                                 break;
558                         }
559                 }
560         } else {
561                 name = db_arg;
562                 db = db_find(mem_ctx, ctdb, dbmap, name);
563         }
564
565         if (db == NULL) {
566                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
567                 return false;
568         }
569
570         if (name == NULL) {
571                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
572                                            ctdb->pnn, TIMEOUT(), id, &name);
573                 if (ret != 0) {
574                         return false;
575                 }
576         }
577
578         if (db_id != NULL) {
579                 *db_id = db->db_id;
580         }
581         if (db_name != NULL) {
582                 *db_name = talloc_strdup(mem_ctx, name);
583         }
584         if (db_flags != NULL) {
585                 *db_flags = db->flags;
586         }
587         return true;
588 }
589
590 static int h2i(char h)
591 {
592         if (h >= 'a' && h <= 'f') {
593                 return h - 'a' + 10;
594         }
595         if (h >= 'A' && h <= 'F') {
596                 return h - 'f' + 10;
597         }
598         return h - '0';
599 }
600
601 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
602                        TDB_DATA *out)
603 {
604         int i;
605         TDB_DATA data;
606
607         if (len & 0x01) {
608                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
609                         str);
610                 return EINVAL;
611         }
612
613         data.dsize = len / 2;
614         data.dptr = talloc_size(mem_ctx, data.dsize);
615         if (data.dptr == NULL) {
616                 return ENOMEM;
617         }
618
619         for (i=0; i<data.dsize; i++) {
620                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
621         }
622
623         *out = data;
624         return 0;
625 }
626
627 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
628                        TDB_DATA *out)
629 {
630         TDB_DATA data;
631         int ret = 0;
632
633         if (strncmp(str, "0x", 2) == 0) {
634                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
635         } else {
636                 data.dptr = talloc_memdup(mem_ctx, str, len);
637                 if (data.dptr == NULL) {
638                         return ENOMEM;
639                 }
640                 data.dsize = len;
641         }
642
643         *out = data;
644         return ret;
645 }
646
647 static int run_helper(const char *command, const char *path, const char *arg1)
648 {
649         pid_t pid;
650         int save_errno, status, ret;
651
652         pid = fork();
653         if (pid < 0) {
654                 save_errno = errno;
655                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
656                         command, path, strerror(save_errno));
657                 return save_errno;
658         }
659
660         if (pid == 0) {
661                 ret = execl(path, path, arg1, NULL);
662                 if (ret == -1) {
663                         _exit(errno);
664                 }
665                 /* Should not happen */
666                 _exit(ENOEXEC);
667         }
668
669         ret = waitpid(pid, &status, 0);
670         if (ret == -1) {
671                 save_errno = errno;
672                 fprintf(stderr, "waitpid() failed for %s - %s\n",
673                         command, strerror(save_errno));
674                 return save_errno;
675         }
676
677         if (WIFEXITED(status)) {
678                 ret = WEXITSTATUS(status);
679                 return ret;
680         }
681
682         if (WIFSIGNALED(status)) {
683                 fprintf(stderr, "%s terminated with signal %d\n",
684                         command, WTERMSIG(status));
685                 return EINTR;
686         }
687
688         return 0;
689 }
690
691 /*
692  * Command Functions
693  */
694
695 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
696                            int argc, const char **argv)
697 {
698         printf("%s\n", CTDB_VERSION_STRING);
699         return 0;
700 }
701
702 static bool partially_online(TALLOC_CTX *mem_ctx,
703                              struct ctdb_context *ctdb,
704                              struct ctdb_node_and_flags *node)
705 {
706         struct ctdb_iface_list *iface_list;
707         int ret, i;
708         bool status = false;
709
710         if (node->flags != 0) {
711                 return false;
712         }
713
714         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
715                                    node->pnn, TIMEOUT(), &iface_list);
716         if (ret != 0) {
717                 return false;
718         }
719
720         status = false;
721         for (i=0; i < iface_list->num; i++) {
722                 if (iface_list->iface[i].link_state == 0) {
723                         status = true;
724                         break;
725                 }
726         }
727
728         return status;
729 }
730
731 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
732                                   struct ctdb_context *ctdb,
733                                   struct ctdb_node_map *nodemap,
734                                   uint32_t mypnn)
735 {
736         struct ctdb_node_and_flags *node;
737         int i;
738
739         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
740                options.sep,
741                "Node", options.sep,
742                "IP", options.sep,
743                "Disconnected", options.sep,
744                "Banned", options.sep,
745                "Disabled", options.sep,
746                "Unhealthy", options.sep,
747                "Stopped", options.sep,
748                "Inactive", options.sep,
749                "PartiallyOnline", options.sep,
750                "ThisNode", options.sep);
751
752         for (i=0; i<nodemap->num; i++) {
753                 node = &nodemap->node[i];
754                 if (node->flags & NODE_FLAGS_DELETED) {
755                         continue;
756                 }
757
758                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
759                        options.sep,
760                        node->pnn, options.sep,
761                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
762                        options.sep,
763                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
764                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
765                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
766                        options.sep,
767                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
768                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
769                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
770                        partially_online(mem_ctx, ctdb, node), options.sep,
771                        (node->pnn == mypnn)?'Y':'N', options.sep);
772         }
773
774 }
775
776 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
777                           struct ctdb_node_map *nodemap, uint32_t mypnn)
778 {
779         struct ctdb_node_and_flags *node;
780         int num_deleted_nodes = 0;
781         int i;
782
783         for (i=0; i<nodemap->num; i++) {
784                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
785                         num_deleted_nodes++;
786                 }
787         }
788
789         if (num_deleted_nodes == 0) {
790                 printf("Number of nodes:%d\n", nodemap->num);
791         } else {
792                 printf("Number of nodes:%d (including %d deleted nodes)\n",
793                        nodemap->num, num_deleted_nodes);
794         }
795
796         for (i=0; i<nodemap->num; i++) {
797                 node = &nodemap->node[i];
798                 if (node->flags & NODE_FLAGS_DELETED) {
799                         continue;
800                 }
801
802                 printf("pnn:%u %-16s %s%s\n",
803                        node->pnn,
804                        ctdb_sock_addr_to_string(mem_ctx, &node->addr),
805                        partially_online(mem_ctx, ctdb, node) ?
806                                 "PARTIALLYONLINE" :
807                                 pretty_print_flags(mem_ctx, node->flags),
808                        node->pnn == mypnn ? " (THIS NODE)" : "");
809         }
810 }
811
812 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
813                          struct ctdb_node_map *nodemap, uint32_t mypnn,
814                          struct ctdb_vnn_map *vnnmap, int recmode,
815                          uint32_t recmaster)
816 {
817         int i;
818
819         print_nodemap(mem_ctx, ctdb, nodemap, mypnn);
820
821         if (vnnmap->generation == INVALID_GENERATION) {
822                 printf("Generation:INVALID\n");
823         } else {
824                 printf("Generation:%u\n", vnnmap->generation);
825         }
826         printf("Size:%d\n", vnnmap->size);
827         for (i=0; i<vnnmap->size; i++) {
828                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
829         }
830
831         printf("Recovery mode:%s (%d)\n",
832                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
833                recmode);
834         printf("Recovery master:%d\n", recmaster);
835 }
836
837 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
838                           int argc, const char **argv)
839 {
840         struct ctdb_node_map *nodemap;
841         struct ctdb_vnn_map *vnnmap;
842         int recmode;
843         uint32_t recmaster;
844         int ret;
845
846         if (argc != 0) {
847                 usage("status");
848         }
849
850         nodemap = get_nodemap(ctdb, false);
851         if (nodemap == NULL) {
852                 return 1;
853         }
854
855         if (options.machinereadable == 1) {
856                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
857                 return 0;
858         }
859
860         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
861                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
862         if (ret != 0) {
863                 return ret;
864         }
865
866         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
867                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
868         if (ret != 0) {
869                 return ret;
870         }
871
872         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
873                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
874         if (ret != 0) {
875                 return ret;
876         }
877
878         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
879                      recmode, recmaster);
880         return 0;
881 }
882
883 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
884                           int argc, const char **argv)
885 {
886         struct ctdb_uptime *uptime;
887         int ret, tmp, days, hours, minutes, seconds;
888
889         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
890                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
891         if (ret != 0) {
892                 return ret;
893         }
894
895         printf("Current time of node %-4u     :                %s",
896                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
897
898         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
899         seconds = tmp % 60; tmp /= 60;
900         minutes = tmp % 60; tmp /= 60;
901         hours = tmp % 24; tmp /= 24;
902         days = tmp;
903
904         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
905                days, hours, minutes, seconds,
906                ctime(&uptime->ctdbd_start_time.tv_sec));
907
908         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
909         seconds = tmp % 60; tmp /= 60;
910         minutes = tmp % 60; tmp /= 60;
911         hours = tmp % 24; tmp /= 24;
912         days = tmp;
913
914         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
915                days, hours, minutes, seconds,
916                ctime(&uptime->last_recovery_finished.tv_sec));
917
918         printf("Duration of last recovery/failover: %lf seconds\n",
919                timeval_delta(&uptime->last_recovery_finished,
920                              &uptime->last_recovery_started));
921
922         return 0;
923 }
924
925 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
926                         int argc, const char **argv)
927 {
928         struct timeval tv;
929         int ret, num_clients;
930
931         tv = timeval_current();
932         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
933                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
934         if (ret != 0) {
935                 return ret;
936         }
937
938         printf("response from %u time=%.6f sec  (%d clients)\n",
939                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
940         return 0;
941 }
942
943 const char *runstate_to_string(enum ctdb_runstate runstate);
944 enum ctdb_runstate runstate_from_string(const char *runstate_str);
945
946 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
947                             int argc, const char **argv)
948 {
949         enum ctdb_runstate runstate;
950         bool found;
951         int ret, i;
952
953         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
954                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
955         if (ret != 0) {
956                 return ret;
957         }
958
959         found = true;
960         for (i=0; i<argc; i++) {
961                 enum ctdb_runstate t;
962
963                 found = false;
964                 t = ctdb_runstate_from_string(argv[i]);
965                 if (t == CTDB_RUNSTATE_UNKNOWN) {
966                         printf("Invalid run state (%s)\n", argv[i]);
967                         return 1;
968                 }
969
970                 if (t == runstate) {
971                         found = true;
972                         break;
973                 }
974         }
975
976         if (! found) {
977                 printf("CTDB not in required run state (got %s)\n",
978                        ctdb_runstate_to_string(runstate));
979                 return 1;
980         }
981
982         printf("%s\n", ctdb_runstate_to_string(runstate));
983         return 0;
984 }
985
986 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
987                           int argc, const char **argv)
988 {
989         struct ctdb_var_list *tun_var_list;
990         uint32_t value;
991         int ret, i;
992         bool found;
993
994         if (argc != 1) {
995                 usage("getvar");
996         }
997
998         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
999                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1000         if (ret != 0) {
1001                 fprintf(stderr,
1002                         "Failed to get list of variables from node %u\n",
1003                         ctdb->cmd_pnn);
1004                 return ret;
1005         }
1006
1007         found = false;
1008         for (i=0; i<tun_var_list->count; i++) {
1009                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1010                         found = true;
1011                         break;
1012                 }
1013         }
1014
1015         if (! found) {
1016                 printf("No such tunable %s\n", argv[0]);
1017                 return 1;
1018         }
1019
1020         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1021                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1022         if (ret != 0) {
1023                 return ret;
1024         }
1025
1026         printf("%-26s = %u\n", argv[0], value);
1027         return 0;
1028 }
1029
1030 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1031                           int argc, const char **argv)
1032 {
1033         struct ctdb_var_list *tun_var_list;
1034         struct ctdb_tunable tunable;
1035         int ret, i;
1036         bool found;
1037
1038         if (argc != 2) {
1039                 usage("setvar");
1040         }
1041
1042         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1043                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1044         if (ret != 0) {
1045                 fprintf(stderr,
1046                         "Failed to get list of variables from node %u\n",
1047                         ctdb->cmd_pnn);
1048                 return ret;
1049         }
1050
1051         found = false;
1052         for (i=0; i<tun_var_list->count; i++) {
1053                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1054                         found = true;
1055                         break;
1056                 }
1057         }
1058
1059         if (! found) {
1060                 printf("No such tunable %s\n", argv[0]);
1061                 return 1;
1062         }
1063
1064         tunable.name = argv[0];
1065         tunable.value = strtoul(argv[1], NULL, 0);
1066
1067         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1068                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1069         if (ret != 0) {
1070                 if (ret == 1) {
1071                         fprintf(stderr,
1072                                 "Setting obsolete tunable variable '%s'\n",
1073                                tunable.name);
1074                         return 0;
1075                 }
1076         }
1077
1078         return ret;
1079 }
1080
1081 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1082                             int argc, const char **argv)
1083 {
1084         struct ctdb_var_list *tun_var_list;
1085         int ret, i;
1086
1087         if (argc != 0) {
1088                 usage("listvars");
1089         }
1090
1091         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1092                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1093         if (ret != 0) {
1094                 return ret;
1095         }
1096
1097         for (i=0; i<tun_var_list->count; i++) {
1098                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1099         }
1100
1101         return 0;
1102 }
1103
1104 const struct {
1105         const char *name;
1106         uint32_t offset;
1107 } stats_fields[] = {
1108 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1109         STATISTICS_FIELD(num_clients),
1110         STATISTICS_FIELD(frozen),
1111         STATISTICS_FIELD(recovering),
1112         STATISTICS_FIELD(num_recoveries),
1113         STATISTICS_FIELD(client_packets_sent),
1114         STATISTICS_FIELD(client_packets_recv),
1115         STATISTICS_FIELD(node_packets_sent),
1116         STATISTICS_FIELD(node_packets_recv),
1117         STATISTICS_FIELD(keepalive_packets_sent),
1118         STATISTICS_FIELD(keepalive_packets_recv),
1119         STATISTICS_FIELD(node.req_call),
1120         STATISTICS_FIELD(node.reply_call),
1121         STATISTICS_FIELD(node.req_dmaster),
1122         STATISTICS_FIELD(node.reply_dmaster),
1123         STATISTICS_FIELD(node.reply_error),
1124         STATISTICS_FIELD(node.req_message),
1125         STATISTICS_FIELD(node.req_control),
1126         STATISTICS_FIELD(node.reply_control),
1127         STATISTICS_FIELD(client.req_call),
1128         STATISTICS_FIELD(client.req_message),
1129         STATISTICS_FIELD(client.req_control),
1130         STATISTICS_FIELD(timeouts.call),
1131         STATISTICS_FIELD(timeouts.control),
1132         STATISTICS_FIELD(timeouts.traverse),
1133         STATISTICS_FIELD(locks.num_calls),
1134         STATISTICS_FIELD(locks.num_current),
1135         STATISTICS_FIELD(locks.num_pending),
1136         STATISTICS_FIELD(locks.num_failed),
1137         STATISTICS_FIELD(total_calls),
1138         STATISTICS_FIELD(pending_calls),
1139         STATISTICS_FIELD(childwrite_calls),
1140         STATISTICS_FIELD(pending_childwrite_calls),
1141         STATISTICS_FIELD(memory_used),
1142         STATISTICS_FIELD(max_hop_count),
1143         STATISTICS_FIELD(total_ro_delegations),
1144         STATISTICS_FIELD(total_ro_revokes),
1145 };
1146
1147 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1148
1149 static void print_statistics_machine(struct ctdb_statistics *s,
1150                                      bool show_header)
1151 {
1152         int i;
1153
1154         if (show_header) {
1155                 printf("CTDB version%s", options.sep);
1156                 printf("Current time of statistics%s", options.sep);
1157                 printf("Statistics collected since%s", options.sep);
1158                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1159                         printf("%s%s", stats_fields[i].name, options.sep);
1160                 }
1161                 printf("num_reclock_ctdbd_latency%s", options.sep);
1162                 printf("min_reclock_ctdbd_latency%s", options.sep);
1163                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1164                 printf("max_reclock_ctdbd_latency%s", options.sep);
1165
1166                 printf("num_reclock_recd_latency%s", options.sep);
1167                 printf("min_reclock_recd_latency%s", options.sep);
1168                 printf("avg_reclock_recd_latency%s", options.sep);
1169                 printf("max_reclock_recd_latency%s", options.sep);
1170
1171                 printf("num_call_latency%s", options.sep);
1172                 printf("min_call_latency%s", options.sep);
1173                 printf("avg_call_latency%s", options.sep);
1174                 printf("max_call_latency%s", options.sep);
1175
1176                 printf("num_lockwait_latency%s", options.sep);
1177                 printf("min_lockwait_latency%s", options.sep);
1178                 printf("avg_lockwait_latency%s", options.sep);
1179                 printf("max_lockwait_latency%s", options.sep);
1180
1181                 printf("num_childwrite_latency%s", options.sep);
1182                 printf("min_childwrite_latency%s", options.sep);
1183                 printf("avg_childwrite_latency%s", options.sep);
1184                 printf("max_childwrite_latency%s", options.sep);
1185                 printf("\n");
1186         }
1187
1188         printf("%u%s", CTDB_PROTOCOL, options.sep);
1189         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1190         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1191         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1192                 printf("%u%s",
1193                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1194                        options.sep);
1195         }
1196         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1197         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1198         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1199         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1200
1201         printf("%u%s", s->reclock.recd.num, options.sep);
1202         printf("%.6f%s", s->reclock.recd.min, options.sep);
1203         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1204         printf("%.6f%s", s->reclock.recd.max, options.sep);
1205
1206         printf("%d%s", s->call_latency.num, options.sep);
1207         printf("%.6f%s", s->call_latency.min, options.sep);
1208         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1209         printf("%.6f%s", s->call_latency.max, options.sep);
1210
1211         printf("%d%s", s->childwrite_latency.num, options.sep);
1212         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1213         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1214         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1215         printf("\n");
1216 }
1217
1218 static void print_statistics(struct ctdb_statistics *s)
1219 {
1220         int tmp, days, hours, minutes, seconds;
1221         int i;
1222         const char *prefix = NULL;
1223         int preflen = 0;
1224
1225         tmp = s->statistics_current_time.tv_sec -
1226               s->statistics_start_time.tv_sec;
1227         seconds = tmp % 60; tmp /= 60;
1228         minutes = tmp % 60; tmp /= 60;
1229         hours   = tmp % 24; tmp /= 24;
1230         days    = tmp;
1231
1232         printf("CTDB version %u\n", CTDB_PROTOCOL);
1233         printf("Current time of statistics  :                %s",
1234                ctime(&s->statistics_current_time.tv_sec));
1235         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1236                days, hours, minutes, seconds,
1237                ctime(&s->statistics_start_time.tv_sec));
1238
1239         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1240                 if (strchr(stats_fields[i].name, '.') != NULL) {
1241                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1242                         if (! prefix ||
1243                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1244                                 prefix = stats_fields[i].name;
1245                                 printf(" %*.*s\n", preflen-1, preflen-1,
1246                                        stats_fields[i].name);
1247                         }
1248                 } else {
1249                         preflen = 0;
1250                 }
1251                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1252                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1253                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1254         }
1255
1256         printf(" hop_count_buckets:");
1257         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1258                 printf(" %d", s->hop_count_bucket[i]);
1259         }
1260         printf("\n");
1261         printf(" lock_buckets:");
1262         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1263                 printf(" %d", s->locks.buckets[i]);
1264         }
1265         printf("\n");
1266         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1267                "locks_latency      MIN/AVG/MAX",
1268                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1269                s->locks.latency.max, s->locks.latency.num);
1270
1271         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1272                "reclock_ctdbd      MIN/AVG/MAX",
1273                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1274                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1275
1276         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1277                "reclock_recd       MIN/AVG/MAX",
1278                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1279                s->reclock.recd.max, s->reclock.recd.num);
1280
1281         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1282                "call_latency       MIN/AVG/MAX",
1283                s->call_latency.min, LATENCY_AVG(s->call_latency),
1284                s->call_latency.max, s->call_latency.num);
1285
1286         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1287                "childwrite_latency MIN/AVG/MAX",
1288                s->childwrite_latency.min,
1289                LATENCY_AVG(s->childwrite_latency),
1290                s->childwrite_latency.max, s->childwrite_latency.num);
1291 }
1292
1293 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1294                               int argc, const char **argv)
1295 {
1296         struct ctdb_statistics *stats;
1297         int ret;
1298
1299         if (argc != 0) {
1300                 usage("statistics");
1301         }
1302
1303         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1304                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1305         if (ret != 0) {
1306                 return ret;
1307         }
1308
1309         if (options.machinereadable) {
1310                 print_statistics_machine(stats, true);
1311         } else {
1312                 print_statistics(stats);
1313         }
1314
1315         return 0;
1316 }
1317
1318 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1319                                     struct ctdb_context *ctdb,
1320                                     int argc, const char **argv)
1321 {
1322         int ret;
1323
1324         if (argc != 0) {
1325                 usage("statisticsreset");
1326         }
1327
1328         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1329                                          ctdb->cmd_pnn, TIMEOUT());
1330         if (ret != 0) {
1331                 return ret;
1332         }
1333
1334         return 0;
1335 }
1336
1337 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1338                          int argc, const char **argv)
1339 {
1340         struct ctdb_statistics_list *slist;
1341         int ret, count = 0, i;
1342         bool show_header = true;
1343
1344         if (argc > 1) {
1345                 usage("stats");
1346         }
1347
1348         if (argc == 1) {
1349                 count = atoi(argv[0]);
1350         }
1351
1352         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1353                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1354         if (ret != 0) {
1355                 return ret;
1356         }
1357
1358         for (i=0; i<slist->num; i++) {
1359                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1360                         continue;
1361                 }
1362                 if (options.machinereadable == 1) {
1363                         print_statistics_machine(&slist->stats[i],
1364                                                  show_header);
1365                         show_header = false;
1366                 } else {
1367                         print_statistics(&slist->stats[i]);
1368                 }
1369                 if (count > 0 && i == count) {
1370                         break;
1371                 }
1372         }
1373
1374         return 0;
1375 }
1376
1377 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1378                      struct ctdb_public_ip_list *ips,
1379                      struct ctdb_public_ip_info **ipinfo,
1380                      bool all_nodes)
1381 {
1382         int i, j;
1383         char *conf, *avail, *active;
1384
1385         if (options.machinereadable == 1) {
1386                 printf("%s%s%s%s%s", options.sep,
1387                        "Public IP", options.sep,
1388                        "Node", options.sep);
1389                 if (options.verbose == 1) {
1390                         printf("%s%s%s%s%s%s\n",
1391                                "ActiveInterfaces", options.sep,
1392                                "AvailableInterfaces", options.sep,
1393                                "ConfiguredInterfaces", options.sep);
1394                 } else {
1395                         printf("\n");
1396                 }
1397         } else {
1398                 if (all_nodes) {
1399                         printf("Public IPs on ALL nodes\n");
1400                 } else {
1401                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1402                 }
1403         }
1404
1405         /* IPs are reverse sorted */
1406         for (i=ips->num-1; i>=0; i--) {
1407
1408                 if (options.machinereadable == 1) {
1409                         printf("%s%s%s%d%s", options.sep,
1410                                ctdb_sock_addr_to_string(
1411                                         mem_ctx, &ips->ip[i].addr),
1412                                options.sep,
1413                                (int)ips->ip[i].pnn, options.sep);
1414                 } else {
1415                         printf("%s", ctdb_sock_addr_to_string(
1416                                                 mem_ctx, &ips->ip[i].addr));
1417                 }
1418
1419                 if (options.verbose == 0) {
1420                         if (options.machinereadable == 1) {
1421                                 printf("\n");
1422                         } else {
1423                                 printf(" %d\n", (int)ips->ip[i].pnn);
1424                         }
1425                         continue;
1426                 }
1427
1428                 conf = NULL;
1429                 avail = NULL;
1430                 active = NULL;
1431
1432                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1433                         struct ctdb_iface *iface;
1434
1435                         iface = &ipinfo[i]->ifaces->iface[j];
1436                         if (conf == NULL) {
1437                                 conf = talloc_strdup(mem_ctx, iface->name);
1438                         } else {
1439                                 conf = talloc_asprintf_append(
1440                                                 mem_ctx, ",%s", iface->name);
1441                         }
1442
1443                         if (ipinfo[i]->active_idx == j) {
1444                                 active = iface->name;
1445                         }
1446
1447                         if (iface->link_state == 0) {
1448                                 continue;
1449                         }
1450
1451                         if (avail == NULL) {
1452                                 avail = talloc_strdup(mem_ctx, iface->name);
1453                         } else {
1454                                 avail = talloc_asprintf_append(
1455                                                 mem_ctx, ",%s", iface->name);
1456                         }
1457                 }
1458
1459                 if (options.machinereadable == 1) {
1460                         printf("%s%s%s%s%s%s\n",
1461                                active ? active : "", options.sep,
1462                                avail ? avail : "", options.sep,
1463                                conf ? conf : "", options.sep);
1464                 } else {
1465                         printf(" node[%u] active[%s] available[%s] configured[%s]\n",
1466                                ips->ip[i].pnn, active ? active : "",
1467                                avail ? avail : "", conf ? conf : "");
1468                 }
1469         }
1470 }
1471
1472 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1473                        size_t datalen, void *private_data)
1474 {
1475         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1476                 private_data, struct ctdb_public_ip_list);
1477         struct ctdb_public_ip *ip;
1478
1479         ip = (struct ctdb_public_ip *)databuf;
1480         ips->ip[ips->num] = *ip;
1481         ips->num += 1;
1482
1483         return 0;
1484 }
1485
1486 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1487                               struct ctdb_public_ip_list **out)
1488 {
1489         struct ctdb_node_map *nodemap;
1490         struct ctdb_public_ip_list *ips;
1491         struct db_hash_context *ipdb;
1492         uint32_t *pnn_list;
1493         int ret, count, i, j;
1494
1495         nodemap = get_nodemap(ctdb, false);
1496         if (nodemap == NULL) {
1497                 return 1;
1498         }
1499
1500         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1501         if (ret != 0) {
1502                 goto failed;
1503         }
1504
1505         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1506                                      &pnn_list);
1507         if (count <= 0) {
1508                 goto failed;
1509         }
1510
1511         for (i=0; i<count; i++) {
1512                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1513                                                pnn_list[i], TIMEOUT(), &ips);
1514                 if (ret != 0) {
1515                         goto failed;
1516                 }
1517
1518                 for (j=0; j<ips->num; j++) {
1519                         struct ctdb_public_ip ip;
1520
1521                         ip.pnn = ips->ip[j].pnn;
1522                         ip.addr = ips->ip[j].addr;
1523
1524                         ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1525                                           sizeof(ip.addr),
1526                                           (uint8_t *)&ip, sizeof(ip));
1527                         if (ret != 0) {
1528                                 goto failed;
1529                         }
1530                 }
1531
1532                 TALLOC_FREE(ips);
1533         }
1534
1535         talloc_free(pnn_list);
1536
1537         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1538         if (ret != 0) {
1539                 goto failed;
1540         }
1541
1542         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1543         if (ips == NULL) {
1544                 goto failed;
1545         }
1546
1547         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1548         if (ips->ip == NULL) {
1549                 goto failed;
1550         }
1551
1552         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         if (count != ips->num) {
1558                 goto failed;
1559         }
1560
1561         talloc_free(ipdb);
1562
1563         *out = ips;
1564         return 0;
1565
1566 failed:
1567         talloc_free(ipdb);
1568         return 1;
1569 }
1570
1571 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1572                       int argc, const char **argv)
1573 {
1574         struct ctdb_public_ip_list *ips;
1575         struct ctdb_public_ip_info **ipinfo;
1576         int ret, i;
1577         bool do_all = false;
1578
1579         if (argc > 1) {
1580                 usage("ip");
1581         }
1582
1583         if (argc == 1) {
1584                 if (strcmp(argv[0], "all") == 0) {
1585                         do_all = true;
1586                 } else {
1587                         usage("ip");
1588                 }
1589         }
1590
1591         if (do_all) {
1592                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1593         } else {
1594                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1595                                                ctdb->cmd_pnn, TIMEOUT(), &ips);
1596         }
1597         if (ret != 0) {
1598                 return ret;
1599         }
1600
1601         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1602         if (ipinfo == NULL) {
1603                 return 1;
1604         }
1605
1606         for (i=0; i<ips->num; i++) {
1607                 uint32_t pnn;
1608                 if (do_all) {
1609                         pnn = ips->ip[i].pnn;
1610                 } else {
1611                         pnn = ctdb->cmd_pnn;
1612                 }
1613                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1614                                                    ctdb->client, pnn,
1615                                                    TIMEOUT(), &ips->ip[i].addr,
1616                                                    &ipinfo[i]);
1617                 if (ret != 0) {
1618                         return ret;
1619                 }
1620         }
1621
1622         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1623         return 0;
1624 }
1625
1626 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1627                           int argc, const char **argv)
1628 {
1629         struct ctdb_public_ip_info *ipinfo;
1630         ctdb_sock_addr addr;
1631         int ret, i;
1632
1633         if (argc != 1) {
1634                 usage("ipinfo");
1635         }
1636
1637         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1638                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1639                 return 1;
1640         }
1641
1642         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1643                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1644                                            &ipinfo);
1645         if (ret != 0) {
1646                 if (ret == -1) {
1647                         printf("Node %u does not know about IP %s\n",
1648                                ctdb->cmd_pnn, argv[0]);
1649                 }
1650                 return ret;
1651         }
1652
1653         printf("Public IP[%s] info on node %u\n",
1654                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1655                                         ctdb->cmd_pnn);
1656
1657         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1658                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr),
1659                ipinfo->ip.pnn, ipinfo->ifaces->num);
1660
1661         for (i=0; i<ipinfo->ifaces->num; i++) {
1662                 struct ctdb_iface *iface;
1663
1664                 iface = &ipinfo->ifaces->iface[i];
1665                 iface->name[CTDB_IFACE_SIZE] = '\0';
1666                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1667                        i+1, iface->name,
1668                        iface->link_state == 0 ? "down" : "up",
1669                        iface->references,
1670                        (i == ipinfo->active_idx) ? " (active)" : "");
1671         }
1672
1673         return 0;
1674 }
1675
1676 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1677                           int argc, const char **argv)
1678 {
1679         struct ctdb_iface_list *ifaces;
1680         int ret, i;
1681
1682         if (argc != 0) {
1683                 usage("ifaces");
1684         }
1685
1686         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1687                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1688         if (ret != 0) {
1689                 return ret;
1690         }
1691
1692         if (ifaces->num == 0) {
1693                 printf("No interfaces configured on node %u\n",
1694                        ctdb->cmd_pnn);
1695                 return 0;
1696         }
1697
1698         if (options.machinereadable) {
1699                 printf("%s%s%s%s%s%s%s\n", options.sep,
1700                        "Name", options.sep,
1701                        "LinkStatus", options.sep,
1702                        "References", options.sep);
1703         } else {
1704                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1705         }
1706
1707         for (i=0; i<ifaces->num; i++) {
1708                 if (options.machinereadable) {
1709                         printf("%s%s%s%u%s%u%s\n", options.sep,
1710                                ifaces->iface[i].name, options.sep,
1711                                ifaces->iface[i].link_state, options.sep,
1712                                ifaces->iface[i].references, options.sep);
1713                 } else {
1714                         printf("name:%s link:%s references:%u\n",
1715                                ifaces->iface[i].name,
1716                                ifaces->iface[i].link_state ? "up" : "down",
1717                                ifaces->iface[i].references);
1718                 }
1719         }
1720
1721         return 0;
1722 }
1723
1724 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1725                                 int argc, const char **argv)
1726 {
1727         struct ctdb_iface_list *ifaces;
1728         struct ctdb_iface *iface;
1729         int ret, i;
1730
1731         if (argc != 2) {
1732                 usage("setifacelink");
1733         }
1734
1735         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1736                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1737                 return 1;
1738         }
1739
1740         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1741                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1742         if (ret != 0) {
1743                 fprintf(stderr,
1744                         "Failed to get interface information from node %u\n",
1745                         ctdb->cmd_pnn);
1746                 return ret;
1747         }
1748
1749         iface = NULL;
1750         for (i=0; i<ifaces->num; i++) {
1751                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1752                         iface = &ifaces->iface[i];
1753                         break;
1754                 }
1755         }
1756
1757         if (iface == NULL) {
1758                 printf("Interface %s not configured on node %u\n",
1759                        argv[0], ctdb->cmd_pnn);
1760                 return 1;
1761         }
1762
1763         if (strcmp(argv[1], "up") == 0) {
1764                 iface->link_state = 1;
1765         } else if (strcmp(argv[1], "down") == 0) {
1766                 iface->link_state = 0;
1767         } else {
1768                 usage("setifacelink");
1769                 return 1;
1770         }
1771
1772         iface->references = 0;
1773
1774         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1775                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1776         if (ret != 0) {
1777                 return ret;
1778         }
1779
1780         return 0;
1781 }
1782
1783 static int control_process_exists(TALLOC_CTX *mem_ctx,
1784                                   struct ctdb_context *ctdb,
1785                                   int argc, const char **argv)
1786 {
1787         pid_t pid;
1788         int ret, status;
1789
1790         if (argc != 1) {
1791                 usage("process-exists");
1792         }
1793
1794         pid = atoi(argv[0]);
1795         ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1796                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1797         if (ret != 0) {
1798                 return ret;
1799         }
1800
1801         if (status == 0) {
1802                 printf("PID %u exists\n", pid);
1803         } else {
1804                 printf("PID %u does not exist\n", pid);
1805         }
1806         return status;
1807 }
1808
1809 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                             int argc, const char **argv)
1811 {
1812         struct ctdb_dbid_map *dbmap;
1813         int ret, i;
1814
1815         if (argc != 0) {
1816                 usage("getdbmap");
1817         }
1818
1819         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1820                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1821         if (ret != 0) {
1822                 return ret;
1823         }
1824
1825         if (options.machinereadable == 1) {
1826                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1827                        options.sep,
1828                        "ID", options.sep,
1829                        "Name", options.sep,
1830                        "Path", options.sep,
1831                        "Persistent", options.sep,
1832                        "Sticky", options.sep,
1833                        "Unhealthy", options.sep,
1834                        "Readonly", options.sep);
1835         } else {
1836                 printf("Number of databases:%d\n", dbmap->num);
1837         }
1838
1839         for (i=0; i<dbmap->num; i++) {
1840                 const char *name;
1841                 const char *path;
1842                 const char *health;
1843                 bool persistent;
1844                 bool readonly;
1845                 bool sticky;
1846                 uint32_t db_id;
1847
1848                 db_id = dbmap->dbs[i].db_id;
1849
1850                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1851                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1852                                            &name);
1853                 if (ret != 0) {
1854                         return ret;
1855                 }
1856
1857                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1858                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1859                                           &path);
1860                 if (ret != 0) {
1861                         return ret;
1862                 }
1863
1864                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1865                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1866                                               &health);
1867                 if (ret != 0) {
1868                         return ret;
1869                 }
1870
1871                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1872                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1873                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1874
1875                 if (options.machinereadable == 1) {
1876                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s\n",
1877                                options.sep,
1878                                db_id, options.sep,
1879                                name, options.sep,
1880                                path, options.sep,
1881                                !! (persistent), options.sep,
1882                                !! (sticky), options.sep,
1883                                !! (health), options.sep,
1884                                !! (readonly), options.sep);
1885                 } else {
1886                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
1887                                db_id, name, path,
1888                                persistent ? " PERSISTENT" : "",
1889                                sticky ? " STICKY" : "",
1890                                readonly ? " READONLY" : "",
1891                                health ? " UNHEALTHY" : "");
1892                 }
1893
1894                 talloc_free(discard_const(name));
1895                 talloc_free(discard_const(path));
1896                 talloc_free(discard_const(health));
1897         }
1898
1899         return 0;
1900 }
1901
1902 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1903                                int argc, const char **argv)
1904 {
1905         uint32_t db_id;
1906         const char *db_name, *db_path, *db_health;
1907         uint8_t db_flags;
1908         int ret;
1909
1910         if (argc != 1) {
1911                 usage("getdbstatus");
1912         }
1913
1914         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
1915                 return 1;
1916         }
1917
1918         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1919                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
1920                                   &db_path);
1921         if (ret != 0) {
1922                 return ret;
1923         }
1924
1925         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1926                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
1927                                       &db_health);
1928         if (ret != 0) {
1929                 return ret;
1930         }
1931
1932         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
1933         printf("PERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
1934                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
1935                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
1936                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
1937                (db_health ? db_health : "OK"));
1938         return 0;
1939 }
1940
1941 struct dump_record_state {
1942         uint32_t count;
1943 };
1944
1945 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
1946
1947 static void dump_tdb_data(const char *name, TDB_DATA val)
1948 {
1949         int i;
1950
1951         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
1952         for (i=0; i<val.dsize; i++) {
1953                 if (ISASCII(val.dptr[i])) {
1954                         fprintf(stdout, "%c", val.dptr[i]);
1955                 } else {
1956                         fprintf(stdout, "\\%02X", val.dptr[i]);
1957                 }
1958         }
1959         fprintf(stdout, "\"\n");
1960 }
1961
1962 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
1963 {
1964         fprintf(stdout, "dmaster: %u\n", header->dmaster);
1965         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
1966         fprintf(stdout, "flags: 0x%08x", header->flags);
1967         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
1968                 fprintf(stdout, " MIGRATED_WITH_DATA");
1969         }
1970         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
1971                 fprintf(stdout, " VACUUM_MIGRATED");
1972         }
1973         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
1974                 fprintf(stdout, " AUTOMATIC");
1975         }
1976         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1977                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
1978         }
1979         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
1980                 fprintf(stdout, " RO_HAVE_READONLY");
1981         }
1982         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
1983                 fprintf(stdout, " RO_REVOKING_READONLY");
1984         }
1985         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
1986                 fprintf(stdout, " RO_REVOKE_COMPLETE");
1987         }
1988         fprintf(stdout, "\n");
1989
1990 }
1991
1992 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
1993                        TDB_DATA key, TDB_DATA data, void *private_data)
1994 {
1995         struct dump_record_state *state =
1996                 (struct dump_record_state *)private_data;
1997
1998         state->count += 1;
1999
2000         dump_tdb_data("key", key);
2001         dump_ltdb_header(header);
2002         dump_tdb_data("data", data);
2003         fprintf(stdout, "\n");
2004
2005         return 0;
2006 }
2007
2008 struct traverse_state {
2009         TALLOC_CTX *mem_ctx;
2010         bool done;
2011         ctdb_rec_parser_func_t func;
2012         struct dump_record_state sub_state;
2013 };
2014
2015 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2016 {
2017         struct traverse_state *state = (struct traverse_state *)private_data;
2018         struct ctdb_rec_data *rec;
2019         struct ctdb_ltdb_header header;
2020         int ret;
2021
2022         ret = ctdb_rec_data_pull(data.dptr, data.dsize, state->mem_ctx, &rec);
2023         if (ret != 0) {
2024                 return;
2025         }
2026
2027         if (rec->key.dsize == 0 && rec->data.dsize == 0) {
2028                 talloc_free(rec);
2029                 /* end of traverse */
2030                 state->done = true;
2031                 return;
2032         }
2033
2034         ret = ctdb_ltdb_header_extract(&rec->data, &header);
2035         if (ret != 0) {
2036                 talloc_free(rec);
2037                 return;
2038         }
2039
2040         if (rec->data.dsize == 0) {
2041                 talloc_free(rec);
2042                 return;
2043         }
2044
2045         ret = state->func(rec->reqid, &header, rec->key, rec->data,
2046                           &state->sub_state);
2047         talloc_free(rec);
2048         if (ret != 0) {
2049                 state->done = true;
2050         }
2051 }
2052
2053 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2054                          int argc, const char **argv)
2055 {
2056         struct ctdb_db_context *db;
2057         const char *db_name;
2058         uint32_t db_id;
2059         uint8_t db_flags;
2060         struct ctdb_traverse_start_ext traverse;
2061         struct traverse_state state;
2062         int ret;
2063
2064         if (argc != 1) {
2065                 usage("catdb");
2066         }
2067
2068         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2069                 return 1;
2070         }
2071
2072         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2073                           db_flags, &db);
2074         if (ret != 0) {
2075                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2076                 return ret;
2077         }
2078
2079         /* Valgrind fix */
2080         ZERO_STRUCT(traverse);
2081
2082         traverse.db_id = db_id;
2083         traverse.reqid = 0;
2084         traverse.srvid = next_srvid(ctdb);
2085         traverse.withemptyrecords = false;
2086
2087         state.mem_ctx = mem_ctx;
2088         state.done = false;
2089         state.func = dump_record;
2090         state.sub_state.count = 0;
2091
2092         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2093                                               traverse.srvid,
2094                                               traverse_handler, &state);
2095         if (ret != 0) {
2096                 return ret;
2097         }
2098
2099         ret = ctdb_ctrl_traverse_start_ext(mem_ctx, ctdb->ev, ctdb->client,
2100                                            ctdb->cmd_pnn, TIMEOUT(),
2101                                            &traverse);
2102         if (ret != 0) {
2103                 return ret;
2104         }
2105
2106         ctdb_client_wait(ctdb->ev, &state.done);
2107
2108         printf("Dumped %u records\n", state.sub_state.count);
2109
2110         ret = ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2111                                                  traverse.srvid, &state);
2112         if (ret != 0) {
2113                 return ret;
2114         }
2115
2116         return 0;
2117 }
2118
2119 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2120                           int argc, const char **argv)
2121 {
2122         struct ctdb_db_context *db;
2123         const char *db_name;
2124         uint32_t db_id;
2125         uint8_t db_flags;
2126         struct dump_record_state state;
2127         int ret;
2128
2129         if (argc != 1) {
2130                 usage("catdb");
2131         }
2132
2133         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2134                 return 1;
2135         }
2136
2137         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2138                           db_flags, &db);
2139         if (ret != 0) {
2140                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2141                 return ret;
2142         }
2143
2144         state.count = 0;
2145         ret = ctdb_db_traverse(db, true, true, dump_record, &state);
2146
2147         printf("Dumped %u record(s)\n", state.count);
2148
2149         return ret;
2150 }
2151
2152 static int control_getmonmode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2153                               int argc, const char **argv)
2154 {
2155         int mode, ret;
2156
2157         if (argc != 0) {
2158                 usage("getmonmode");
2159         }
2160
2161         ret = ctdb_ctrl_get_monmode(mem_ctx, ctdb->ev, ctdb->client,
2162                                     ctdb->cmd_pnn, TIMEOUT(), &mode);
2163         if (ret != 0) {
2164                 return ret;
2165         }
2166
2167         printf("%s\n",
2168                (mode == CTDB_MONITORING_ENABLED) ? "ENABLED" : "DISABLED");
2169         return 0;
2170 }
2171
2172 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2173                                    struct ctdb_context *ctdb,
2174                                    int argc, const char **argv)
2175 {
2176         uint32_t caps;
2177         int ret;
2178
2179         if (argc != 0) {
2180                 usage("getcapabilities");
2181         }
2182
2183         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2184                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2185         if (ret != 0) {
2186                 return ret;
2187         }
2188
2189         if (options.machinereadable == 1) {
2190                 printf("%s%s%s%s%s\n",
2191                        options.sep,
2192                        "RECMASTER", options.sep,
2193                        "LMASTER", options.sep);
2194                 printf("%s%d%s%d%s\n", options.sep,
2195                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2196                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2197         } else {
2198                 printf("RECMASTER: %s\n",
2199                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2200                 printf("LMASTER: %s\n",
2201                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2202         }
2203
2204         return 0;
2205 }
2206
2207 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2208                        int argc, const char **argv)
2209 {
2210         printf("%u\n", ctdb_client_pnn(ctdb->client));
2211         return 0;
2212 }
2213
2214 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2215                        int argc, const char **argv)
2216 {
2217         char *t, *lvs_helper = NULL;
2218
2219         if (argc != 1) {
2220                 usage("lvs");
2221         }
2222
2223         t = getenv("CTDB_LVS_HELPER");
2224         if (t != NULL) {
2225                 lvs_helper = talloc_strdup(mem_ctx, t);
2226         } else {
2227                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2228                                              CTDB_HELPER_BINDIR);
2229         }
2230
2231         if (lvs_helper == NULL) {
2232                 fprintf(stderr, "Unable to set LVS helper\n");
2233                 return 1;
2234         }
2235
2236         return run_helper("LVS helper", lvs_helper, argv[0]);
2237 }
2238
2239 static int control_disable_monitor(TALLOC_CTX *mem_ctx,
2240                                    struct ctdb_context *ctdb,
2241                                    int argc, const char **argv)
2242 {
2243         int ret;
2244
2245         if (argc != 0) {
2246                 usage("disablemonitor");
2247         }
2248
2249         ret = ctdb_ctrl_disable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2250                                         ctdb->cmd_pnn, TIMEOUT());
2251         if (ret != 0) {
2252                 return ret;
2253         }
2254
2255         return 0;
2256 }
2257
2258 static int control_enable_monitor(TALLOC_CTX *mem_ctx,
2259                                   struct ctdb_context *ctdb,
2260                                   int argc, const char **argv)
2261 {
2262         int ret;
2263
2264         if (argc != 0) {
2265                 usage("enablemonitor");
2266         }
2267
2268         ret = ctdb_ctrl_enable_monitor(mem_ctx, ctdb->ev, ctdb->client,
2269                                        ctdb->cmd_pnn, TIMEOUT());
2270         if (ret != 0) {
2271                 return ret;
2272         }
2273
2274         return 0;
2275 }
2276
2277 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2278                             int argc, const char **argv)
2279 {
2280         enum debug_level log_level;
2281         int ret;
2282         bool found;
2283
2284         if (argc != 1) {
2285                 usage("setdebug");
2286         }
2287
2288         found = debug_level_parse(argv[0], &log_level);
2289         if (! found) {
2290                 fprintf(stderr,
2291                         "Invalid debug level '%s'. Valid levels are:\n",
2292                         argv[0]);
2293                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2294                 return 1;
2295         }
2296
2297         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2298                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2299         if (ret != 0) {
2300                 return ret;
2301         }
2302
2303         return 0;
2304 }
2305
2306 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2307                             int argc, const char **argv)
2308 {
2309         enum debug_level loglevel;
2310         const char *log_str;
2311         int ret;
2312
2313         if (argc != 0) {
2314                 usage("getdebug");
2315         }
2316
2317         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2318                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2319         if (ret != 0) {
2320                 return ret;
2321         }
2322
2323         log_str = debug_level_to_string(loglevel);
2324         printf("%s\n", log_str);
2325
2326         return 0;
2327 }
2328
2329 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2330                           int argc, const char **argv)
2331 {
2332         const char *db_name;
2333         uint8_t db_flags = 0;
2334         int ret;
2335
2336         if (argc < 1 || argc > 2) {
2337                 usage("attach");
2338         }
2339
2340         db_name = argv[0];
2341         if (argc == 2) {
2342                 if (strcmp(argv[1], "persistent") == 0) {
2343                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2344                 } else if (strcmp(argv[1], "readonly") == 0) {
2345                         db_flags = CTDB_DB_FLAGS_READONLY;
2346                 } else if (strcmp(argv[1], "sticky") == 0) {
2347                         db_flags = CTDB_DB_FLAGS_STICKY;
2348                 } else {
2349                         usage("attach");
2350                 }
2351         }
2352
2353         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2354                           db_flags, NULL);
2355         if (ret != 0) {
2356                 return ret;
2357         }
2358
2359         return 0;
2360 }
2361
2362 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2363                           int argc, const char **argv)
2364 {
2365         const char *db_name;
2366         uint32_t db_id;
2367         uint8_t db_flags;
2368         struct ctdb_node_map *nodemap;
2369         int recmode;
2370         int ret, ret2, i;
2371
2372         if (argc < 1) {
2373                 usage("detach");
2374         }
2375
2376         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2377                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2378         if (ret != 0) {
2379                 return ret;
2380         }
2381
2382         if (recmode == CTDB_RECOVERY_ACTIVE) {
2383                 fprintf(stderr, "Database cannot be detached"
2384                                 " when recovery is active\n");
2385                 return 1;
2386         }
2387
2388         nodemap = get_nodemap(ctdb, false);
2389         if (nodemap == NULL) {
2390                 return 1;
2391         }
2392
2393         for (i=0; i<nodemap->num; i++) {
2394                 uint32_t value;
2395
2396                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2397                         continue;
2398                 }
2399                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2400                         continue;
2401                 }
2402                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2403                         fprintf(stderr, "Database cannot be detached on"
2404                                 " inactive (stopped or banned) node %u\n",
2405                                 nodemap->node[i].pnn);
2406                         return 1;
2407                 }
2408
2409                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2410                                             nodemap->node[i].pnn, TIMEOUT(),
2411                                             "AllowClientDBAttach", &value);
2412                 if (ret != 0) {
2413                         fprintf(stderr,
2414                                 "Unable to get tunable AllowClientDBAttach"
2415                                 " from node %u\n", nodemap->node[i].pnn);
2416                         return ret;
2417                 }
2418
2419                 if (value == 1) {
2420                         fprintf(stderr,
2421                                 "Database access is still active on node %u."
2422                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2423                                 nodemap->node[i].pnn);
2424                         return 1;
2425                 }
2426         }
2427
2428         ret2 = 0;
2429         for (i=0; i<argc; i++) {
2430                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2431                                 &db_flags)) {
2432                         continue;
2433                 }
2434
2435                 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
2436                         fprintf(stderr,
2437                                 "Persistent database %s cannot be detached\n",
2438                                 argv[0]);
2439                         return 1;
2440                 }
2441
2442                 ret = ctdb_detach(mem_ctx, ctdb->ev, ctdb->client,
2443                                   TIMEOUT(), db_id);
2444                 if (ret != 0) {
2445                         fprintf(stderr, "Database %s detach failed\n", db_name);
2446                         ret2 = ret;
2447                 }
2448         }
2449
2450         return ret2;
2451 }
2452
2453 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2454                               int argc, const char **argv)
2455 {
2456         const char *mem_str;
2457         ssize_t n;
2458         int ret;
2459
2460         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2461                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2462         if (ret != 0) {
2463                 return ret;
2464         }
2465
2466         n = write(1, mem_str, strlen(mem_str)+1);
2467         if (n < 0 || n != strlen(mem_str)+1) {
2468                 fprintf(stderr, "Failed to write talloc summary\n");
2469                 return 1;
2470         }
2471
2472         return 0;
2473 }
2474
2475 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2476 {
2477         bool *done = (bool *)private_data;
2478         ssize_t n;
2479
2480         n = write(1, data.dptr, data.dsize);
2481         if (n < 0 || n != data.dsize) {
2482                 fprintf(stderr, "Failed to write talloc summary\n");
2483         }
2484
2485         *done = true;
2486 }
2487
2488 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2489                                 int argc, const char **argv)
2490 {
2491         struct ctdb_srvid_message msg = { 0 };
2492         int ret;
2493         bool done = false;
2494
2495         msg.pnn = ctdb->pnn;
2496         msg.srvid = next_srvid(ctdb);
2497
2498         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2499                                               msg.srvid, dump_memory, &done);
2500         if (ret != 0) {
2501                 return ret;
2502         }
2503
2504         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2505                                     ctdb->cmd_pnn, &msg);
2506         if (ret != 0) {
2507                 return ret;
2508         }
2509
2510         ctdb_client_wait(ctdb->ev, &done);
2511         return 0;
2512 }
2513
2514 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2515                           int argc, const char **argv)
2516 {
2517         pid_t pid;
2518         int ret;
2519
2520         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2521                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2522         if (ret != 0) {
2523                 return ret;
2524         }
2525
2526         printf("%u\n", pid);
2527         return 0;
2528 }
2529
2530 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2531                        const char *desc, uint32_t flag, bool set_flag)
2532 {
2533         struct ctdb_node_map *nodemap;
2534         bool flag_is_set;
2535
2536         nodemap = get_nodemap(ctdb, false);
2537         if (nodemap == NULL) {
2538                 return 1;
2539         }
2540
2541         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2542         if (set_flag == flag_is_set) {
2543                 if (set_flag) {
2544                         fprintf(stderr, "Node %u is already %s\n",
2545                                 ctdb->cmd_pnn, desc);
2546                 } else {
2547                         fprintf(stderr, "Node %u is not %s\n",
2548                                 ctdb->cmd_pnn, desc);
2549                 }
2550                 return 0;
2551         }
2552
2553         return 1;
2554 }
2555
2556 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2557                            uint32_t flag, bool set_flag)
2558 {
2559         struct ctdb_node_map *nodemap;
2560         bool flag_is_set;
2561
2562         while (1) {
2563                 nodemap = get_nodemap(ctdb, true);
2564                 if (nodemap == NULL) {
2565                         fprintf(stderr,
2566                                 "Failed to get nodemap, trying again\n");
2567                         sleep(1);
2568                         continue;
2569                 }
2570
2571                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2572                 if (flag_is_set == set_flag) {
2573                         break;
2574                 }
2575
2576                 sleep(1);
2577         }
2578 }
2579
2580 struct ipreallocate_state {
2581         int status;
2582         bool done;
2583 };
2584
2585 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2586                                  void *private_data)
2587 {
2588         struct ipreallocate_state *state =
2589                 (struct ipreallocate_state *)private_data;
2590
2591         if (data.dsize != sizeof(int)) {
2592                 /* Ignore packet */
2593                 return;
2594         }
2595
2596         state->status = *(int *)data.dptr;
2597         state->done = true;
2598 }
2599
2600 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2601 {
2602         struct ctdb_srvid_message msg = { 0 };
2603         struct ipreallocate_state state;
2604         int ret;
2605
2606         msg.pnn = ctdb->pnn;
2607         msg.srvid = next_srvid(ctdb);
2608
2609         state.done = false;
2610         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2611                                               msg.srvid,
2612                                               ipreallocate_handler, &state);
2613         if (ret != 0) {
2614                 return ret;
2615         }
2616
2617         while (true) {
2618                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2619                                                 ctdb->client,
2620                                                 CTDB_BROADCAST_CONNECTED,
2621                                                 &msg);
2622                 if (ret != 0) {
2623                         goto fail;
2624                 }
2625
2626                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2627                                                TIMEOUT());
2628                 if (ret != 0) {
2629                         continue;
2630                 }
2631
2632                 if (state.status >= 0) {
2633                         ret = 0;
2634                 } else {
2635                         ret = state.status;
2636                 }
2637                 break;
2638         }
2639
2640 fail:
2641         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2642                                            msg.srvid, &state);
2643         return ret;
2644 }
2645
2646 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2647                            int argc, const char **argv)
2648 {
2649         int ret;
2650
2651         if (argc != 0) {
2652                 usage("disable");
2653         }
2654
2655         ret = check_flags(mem_ctx, ctdb, "disabled",
2656                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2657         if (ret == 0) {
2658                 return 0;
2659         }
2660
2661         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2662                                  ctdb->cmd_pnn, TIMEOUT(),
2663                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2664         if (ret != 0) {
2665                 fprintf(stderr,
2666                         "Failed to set DISABLED flag on node %u\n",
2667                         ctdb->cmd_pnn);
2668                 return ret;
2669         }
2670
2671         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2672         return ipreallocate(mem_ctx, ctdb);
2673 }
2674
2675 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2676                           int argc, const char **argv)
2677 {
2678         int ret;
2679
2680         if (argc != 0) {
2681                 usage("enable");
2682         }
2683
2684         ret = check_flags(mem_ctx, ctdb, "disabled",
2685                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2686         if (ret == 0) {
2687                 return 0;
2688         }
2689
2690         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2691                                  ctdb->cmd_pnn, TIMEOUT(),
2692                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2693         if (ret != 0) {
2694                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2695                         ctdb->cmd_pnn);
2696                 return ret;
2697         }
2698
2699         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2700         return ipreallocate(mem_ctx, ctdb);
2701 }
2702
2703 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2704                         int argc, const char **argv)
2705 {
2706         int ret;
2707
2708         if (argc != 0) {
2709                 usage("stop");
2710         }
2711
2712         ret = check_flags(mem_ctx, ctdb, "stopped",
2713                           NODE_FLAGS_STOPPED, true);
2714         if (ret == 0) {
2715                 return 0;
2716         }
2717
2718         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2719                                   ctdb->cmd_pnn, TIMEOUT());
2720         if (ret != 0) {
2721                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2722                 return ret;
2723         }
2724
2725         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2726         return ipreallocate(mem_ctx, ctdb);
2727 }
2728
2729 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2730                             int argc, const char **argv)
2731 {
2732         int ret;
2733
2734         if (argc != 0) {
2735                 usage("continue");
2736         }
2737
2738         ret = check_flags(mem_ctx, ctdb, "stopped",
2739                           NODE_FLAGS_STOPPED, false);
2740         if (ret == 0) {
2741                 return 0;
2742         }
2743
2744         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2745                                       ctdb->cmd_pnn, TIMEOUT());
2746         if (ret != 0) {
2747                 fprintf(stderr, "Failed to continue stopped node %u\n",
2748                         ctdb->cmd_pnn);
2749                 return ret;
2750         }
2751
2752         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2753         return ipreallocate(mem_ctx, ctdb);
2754 }
2755
2756 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2757                        int argc, const char **argv)
2758 {
2759         struct ctdb_ban_state ban_state;
2760         int ret;
2761
2762         if (argc != 1) {
2763                 usage("ban");
2764         }
2765
2766         ret = check_flags(mem_ctx, ctdb, "banned",
2767                           NODE_FLAGS_BANNED, true);
2768         if (ret == 0) {
2769                 return 0;
2770         }
2771
2772         ban_state.pnn = ctdb->cmd_pnn;
2773         ban_state.time = strtoul(argv[0], NULL, 0);
2774
2775         if (ban_state.time == 0) {
2776                 fprintf(stderr, "Ban time cannot be zero\n");
2777                 return EINVAL;
2778         }
2779
2780         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2781                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2782         if (ret != 0) {
2783                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2784                 return ret;
2785         }
2786
2787         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2788         return ipreallocate(mem_ctx, ctdb);
2789
2790 }
2791
2792 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2793                          int argc, const char **argv)
2794 {
2795         struct ctdb_ban_state ban_state;
2796         int ret;
2797
2798         if (argc != 0) {
2799                 usage("unban");
2800         }
2801
2802         ret = check_flags(mem_ctx, ctdb, "banned",
2803                           NODE_FLAGS_BANNED, false);
2804         if (ret == 0) {
2805                 return 0;
2806         }
2807
2808         ban_state.pnn = ctdb->cmd_pnn;
2809         ban_state.time = 0;
2810
2811         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2812                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2813         if (ret != 0) {
2814                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2815                 return ret;
2816         }
2817
2818         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2819         return ipreallocate(mem_ctx, ctdb);
2820
2821 }
2822
2823 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2824                             int argc, const char **argv)
2825 {
2826         int ret;
2827
2828         if (argc != 0) {
2829                 usage("shutdown");
2830         }
2831
2832         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2833                                  ctdb->cmd_pnn, TIMEOUT());
2834         if (ret != 0) {
2835                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2836                 return ret;
2837         }
2838
2839         return 0;
2840 }
2841
2842 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2843                           uint32_t *generation)
2844 {
2845         uint32_t recmaster;
2846         int recmode;
2847         struct ctdb_vnn_map *vnnmap;
2848         int ret;
2849
2850 again:
2851         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2852                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2853         if (ret != 0) {
2854                 fprintf(stderr, "Failed to find recovery master\n");
2855                 return ret;
2856         }
2857
2858         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2859                                     recmaster, TIMEOUT(), &recmode);
2860         if (ret != 0) {
2861                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2862                         recmaster);
2863                 return ret;
2864         }
2865
2866         if (recmode == CTDB_RECOVERY_ACTIVE) {
2867                 sleep(1);
2868                 goto again;
2869         }
2870
2871         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2872                                   recmaster, TIMEOUT(), &vnnmap);
2873         if (ret != 0) {
2874                 fprintf(stderr, "Failed to get generation from node %u\n",
2875                         recmaster);
2876                 return ret;
2877         }
2878
2879         if (vnnmap->generation == INVALID_GENERATION) {
2880                 talloc_free(vnnmap);
2881                 sleep(1);
2882                 goto again;
2883         }
2884
2885         *generation = vnnmap->generation;
2886         talloc_free(vnnmap);
2887         return 0;
2888 }
2889
2890
2891 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2892                            int argc, const char **argv)
2893 {
2894         uint32_t generation, next_generation;
2895         int ret;
2896
2897         if (argc != 0) {
2898                 usage("recover");
2899         }
2900
2901         ret = get_generation(mem_ctx, ctdb, &generation);
2902         if (ret != 0) {
2903                 return ret;
2904         }
2905
2906         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2907                                     ctdb->cmd_pnn, TIMEOUT(),
2908                                     CTDB_RECOVERY_ACTIVE);
2909         if (ret != 0) {
2910                 fprintf(stderr, "Failed to set recovery mode active\n");
2911                 return ret;
2912         }
2913
2914         while (1) {
2915                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2916                 if (ret != 0) {
2917                         fprintf(stderr,
2918                                 "Failed to confirm end of recovery\n");
2919                         return ret;
2920                 }
2921
2922                 if (next_generation != generation) {
2923                         break;
2924                 }
2925
2926                 sleep (1);
2927         }
2928
2929         return 0;
2930 }
2931
2932 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2933                                 int argc, const char **argv)
2934 {
2935         if (argc != 0) {
2936                 usage("ipreallocate");
2937         }
2938
2939         return ipreallocate(mem_ctx, ctdb);
2940 }
2941
2942 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2943                                   struct ctdb_context *ctdb,
2944                                   int argc, const char **argv)
2945 {
2946         uint32_t recmaster;
2947         int ret;
2948
2949         if (argc != 0) {
2950                 usage("isnotrecmaster");
2951         }
2952
2953         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2954                                       ctdb->pnn, TIMEOUT(), &recmaster);
2955         if (ret != 0) {
2956                 fprintf(stderr, "Failed to get recmaster\n");
2957                 return ret;
2958         }
2959
2960         if (recmaster != ctdb->pnn) {
2961                 printf("this node is not the recmaster\n");
2962                 return 1;
2963         }
2964
2965         printf("this node is the recmaster\n");
2966         return 0;
2967 }
2968
2969 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2970                            int argc, const char **argv)
2971 {
2972         struct ctdb_addr_info addr_info;
2973         int ret;
2974
2975         if (argc != 2) {
2976                 usage("gratarp");
2977         }
2978
2979         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2980                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2981                 return 1;
2982         }
2983         addr_info.iface = argv[1];
2984
2985         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2986                                             ctdb->cmd_pnn, TIMEOUT(),
2987                                             &addr_info);
2988         if (ret != 0) {
2989                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2990                         ctdb->cmd_pnn);
2991                 return ret;
2992         }
2993
2994         return 0;
2995 }
2996
2997 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2998                            int argc, const char **argv)
2999 {
3000         ctdb_sock_addr src, dst;
3001         int ret;
3002
3003         if (argc != 0 && argc != 2) {
3004                 usage("tickle");
3005         }
3006
3007         if (argc == 0) {
3008                 struct ctdb_connection *clist;
3009                 int count;
3010                 int i, num_failed;
3011
3012                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3013                 if (ret != 0) {
3014                         return ret;
3015                 }
3016
3017                 num_failed = 0;
3018                 for (i=0; i<count; i++) {
3019                         ret = ctdb_sys_send_tcp(&clist[i].src,
3020                                                 &clist[i].dst,
3021                                                 0, 0, 0);
3022                         if (ret != 0) {
3023                                 num_failed += 1;
3024                         }
3025                 }
3026
3027                 TALLOC_FREE(clist);
3028
3029                 if (num_failed > 0) {
3030                         fprintf(stderr, "Failed to send %d tickles\n",
3031                                 num_failed);
3032                         return 1;
3033                 }
3034
3035                 return 0;
3036         }
3037
3038
3039         if (! parse_ip_port(argv[0], &src)) {
3040                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3041                 return 1;
3042         }
3043
3044         if (! parse_ip_port(argv[1], &dst)) {
3045                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3046                 return 1;
3047         }
3048
3049         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3050         if (ret != 0) {
3051                 fprintf(stderr, "Failed to send tickle ack\n");
3052                 return ret;
3053         }
3054
3055         return 0;
3056 }
3057
3058 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3059                               int argc, const char **argv)
3060 {
3061         ctdb_sock_addr addr;
3062         struct ctdb_tickle_list *tickles;
3063         unsigned port = 0;
3064         int ret, i;
3065
3066         if (argc < 1 || argc > 2) {
3067                 usage("gettickles");
3068         }
3069
3070         if (argc == 2) {
3071                 port = strtoul(argv[1], NULL, 10);
3072         }
3073
3074         if (! parse_ip(argv[0], NULL, port, &addr)) {
3075                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3076                 return 1;
3077         }
3078
3079         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3080                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3081                                             &tickles);
3082         if (ret != 0) {
3083                 fprintf(stderr, "Failed to get list of connections\n");
3084                 return ret;
3085         }
3086
3087         if (options.machinereadable) {
3088                 printf("%s%s%s%s%s%s%s%s%s\n",
3089                        options.sep,
3090                        "Source IP", options.sep,
3091                        "Port", options.sep,
3092                        "Destiation IP", options.sep,
3093                        "Port", options.sep);
3094                 for (i=0; i<tickles->num; i++) {
3095                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3096                                ctdb_sock_addr_to_string(
3097                                        mem_ctx, &tickles->conn[i].src),
3098                                options.sep,
3099                                ntohs(tickles->conn[i].src.ip.sin_port),
3100                                options.sep,
3101                                ctdb_sock_addr_to_string(
3102                                        mem_ctx, &tickles->conn[i].dst),
3103                                options.sep,
3104                                ntohs(tickles->conn[i].dst.ip.sin_port),
3105                                options.sep);
3106                 }
3107         } else {
3108                 printf("Connections for IP: %s\n",
3109                        ctdb_sock_addr_to_string(mem_ctx, &tickles->addr));
3110                 printf("Num connections: %u\n", tickles->num);
3111                 for (i=0; i<tickles->num; i++) {
3112                         printf("SRC: %s:%u   DST: %s:%u\n",
3113                                ctdb_sock_addr_to_string(
3114                                        mem_ctx, &tickles->conn[i].src),
3115                                ntohs(tickles->conn[i].src.ip.sin_port),
3116                                ctdb_sock_addr_to_string(
3117                                        mem_ctx, &tickles->conn[i].dst),
3118                                ntohs(tickles->conn[i].dst.ip.sin_port));
3119                 }
3120         }
3121
3122         talloc_free(tickles);
3123         return 0;
3124 }
3125
3126 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3127                                    struct ctdb_connection *conn);
3128
3129 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3130
3131 struct process_clist_state {
3132         struct ctdb_connection *clist;
3133         int count;
3134         int num_failed, num_total;
3135         clist_reply_func reply_func;
3136 };
3137
3138 static void process_clist_done(struct tevent_req *subreq);
3139
3140 static struct tevent_req *process_clist_send(
3141                                         TALLOC_CTX *mem_ctx,
3142                                         struct ctdb_context *ctdb,
3143                                         struct ctdb_connection *clist,
3144                                         int count,
3145                                         clist_request_func request_func,
3146                                         clist_reply_func reply_func)
3147 {
3148         struct tevent_req *req, *subreq;
3149         struct process_clist_state *state;
3150         struct ctdb_req_control request;
3151         int i;
3152
3153         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3154         if (req == NULL) {
3155                 return NULL;
3156         }
3157
3158         state->clist = clist;
3159         state->count = count;
3160         state->reply_func = reply_func;
3161
3162         for (i=0; i<count; i++) {
3163                 request_func(&request, &clist[i]);
3164                 subreq = ctdb_client_control_send(state, ctdb->ev,
3165                                                   ctdb->client, ctdb->cmd_pnn,
3166                                                   TIMEOUT(), &request);
3167                 if (tevent_req_nomem(subreq, req)) {
3168                         return tevent_req_post(req, ctdb->ev);
3169                 }
3170                 tevent_req_set_callback(subreq, process_clist_done, req);
3171         }
3172
3173         return req;
3174 }
3175
3176 static void process_clist_done(struct tevent_req *subreq)
3177 {
3178         struct tevent_req *req = tevent_req_callback_data(
3179                 subreq, struct tevent_req);
3180         struct process_clist_state *state = tevent_req_data(
3181                 req, struct process_clist_state);
3182         struct ctdb_reply_control *reply;
3183         int ret;
3184         bool status;
3185
3186         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3187         TALLOC_FREE(subreq);
3188         if (! status) {
3189                 state->num_failed += 1;
3190                 goto done;
3191         }
3192
3193         ret = state->reply_func(reply);
3194         if (ret != 0) {
3195                 state->num_failed += 1;
3196                 goto done;
3197         }
3198
3199 done:
3200         state->num_total += 1;
3201         if (state->num_total == state->count) {
3202                 tevent_req_done(req);
3203         }
3204 }
3205
3206 static int process_clist_recv(struct tevent_req *req)
3207 {
3208         struct process_clist_state *state = tevent_req_data(
3209                 req, struct process_clist_state);
3210
3211         return state->num_failed;
3212 }
3213
3214 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3215                              int argc, const char **argv)
3216 {
3217         struct ctdb_connection conn;
3218         int ret;
3219
3220         if (argc != 0 && argc != 2) {
3221                 usage("addtickle");
3222         }
3223
3224         if (argc == 0) {
3225                 struct ctdb_connection *clist;
3226                 struct tevent_req *req;
3227                 int count;
3228
3229                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3230                 if (ret != 0) {
3231                         return ret;
3232                 }
3233                 if (count == 0) {
3234                         return 0;
3235                 }
3236
3237                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3238                                  ctdb_req_control_tcp_add_delayed_update,
3239                                  ctdb_reply_control_tcp_add_delayed_update);
3240                 if (req == NULL) {
3241                         talloc_free(clist);
3242                         return ENOMEM;
3243                 }
3244
3245                 tevent_req_poll(req, ctdb->ev);
3246                 talloc_free(clist);
3247
3248                 ret = process_clist_recv(req);
3249                 if (ret != 0) {
3250                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3251                         return 1;
3252                 }
3253
3254                 return 0;
3255         }
3256
3257         if (! parse_ip_port(argv[0], &conn.src)) {
3258                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3259                 return 1;
3260         }
3261         if (! parse_ip_port(argv[1], &conn.dst)) {
3262                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3263                 return 1;
3264         }
3265
3266         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3267                                                ctdb->client, ctdb->cmd_pnn,
3268                                                TIMEOUT(), &conn);
3269         if (ret != 0) {
3270                 fprintf(stderr, "Failed to register connection\n");
3271                 return ret;
3272         }
3273
3274         return 0;
3275 }
3276
3277 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3278                              int argc, const char **argv)
3279 {
3280         struct ctdb_connection conn;
3281         int ret;
3282
3283         if (argc != 0 && argc != 2) {
3284                 usage("deltickle");
3285         }
3286
3287         if (argc == 0) {
3288                 struct ctdb_connection *clist;
3289                 struct tevent_req *req;
3290                 int count;
3291
3292                 ret = ctdb_parse_connections(stdin, mem_ctx, &count, &clist);
3293                 if (ret != 0) {
3294                         return ret;
3295                 }
3296                 if (count == 0) {
3297                         return 0;
3298                 }
3299
3300                 req = process_clist_send(mem_ctx, ctdb, clist, count,
3301                                          ctdb_req_control_tcp_remove,
3302                                          ctdb_reply_control_tcp_remove);
3303                 if (req == NULL) {
3304                         talloc_free(clist);
3305                         return ENOMEM;
3306                 }
3307
3308                 tevent_req_poll(req, ctdb->ev);
3309                 talloc_free(clist);
3310
3311                 ret = process_clist_recv(req);
3312                 if (ret != 0) {
3313                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3314                         return 1;
3315                 }
3316
3317                 return 0;
3318         }
3319
3320         if (! parse_ip_port(argv[0], &conn.src)) {
3321                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3322                 return 1;
3323         }
3324         if (! parse_ip_port(argv[1], &conn.dst)) {
3325                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3326                 return 1;
3327         }
3328
3329         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3330                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3331         if (ret != 0) {
3332                 fprintf(stderr, "Failed to unregister connection\n");
3333                 return ret;
3334         }
3335
3336         return 0;
3337 }
3338
3339 static int control_check_srvids(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3340                                 int argc, const char **argv)
3341 {
3342         uint64_t *srvid;
3343         uint8_t *result;
3344         int ret, i;
3345
3346         if (argc == 0) {
3347                 usage("check_srvids");
3348         }
3349
3350         srvid = talloc_array(mem_ctx, uint64_t, argc);
3351         if (srvid == NULL) {
3352                 fprintf(stderr, "Memory allocation error\n");
3353                 return 1;
3354         }
3355
3356         for (i=0; i<argc; i++) {
3357                 srvid[i] = strtoull(argv[i], NULL, 0);
3358         }
3359
3360         ret = ctdb_ctrl_check_srvids(mem_ctx, ctdb->ev, ctdb->client,
3361                                      ctdb->cmd_pnn, TIMEOUT(), srvid, argc,
3362                                      &result);
3363         if (ret != 0) {
3364                 fprintf(stderr, "Failed to check srvids on node %u\n",
3365                         ctdb->cmd_pnn);
3366                 return ret;
3367         }
3368
3369         for (i=0; i<argc; i++) {
3370                 printf("SRVID 0x%" PRIx64 " %s\n", srvid[i],
3371                        (result[i] ? "exists" : "does not exist"));
3372         }
3373
3374         return 0;
3375 }
3376
3377 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3378                              int argc, const char **argv)
3379 {
3380         struct ctdb_node_map *nodemap;
3381         int i;
3382
3383         if (argc != 0) {
3384                 usage("listnodes");
3385         }
3386
3387         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3388         if (nodemap == NULL) {
3389                 return 1;
3390         }
3391
3392         for (i=0; i<nodemap->num; i++) {
3393                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3394                         continue;
3395                 }
3396
3397                 if (options.machinereadable) {
3398                         printf("%s%u%s%s%s\n", options.sep,
3399                                nodemap->node[i].pnn, options.sep,
3400                                ctdb_sock_addr_to_string(
3401                                         mem_ctx, &nodemap->node[i].addr),
3402                                options.sep);
3403                 } else {
3404                         printf("%s\n",
3405                                ctdb_sock_addr_to_string(
3406                                         mem_ctx, &nodemap->node[i].addr));
3407                 }
3408         }
3409
3410         return 0;
3411 }
3412
3413 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3414                               struct ctdb_node_map *nodemap2)
3415 {
3416         int i;
3417
3418         if (nodemap1->num != nodemap2->num) {
3419                 return false;
3420         }
3421
3422         for (i=0; i<nodemap1->num; i++) {
3423                 struct ctdb_node_and_flags *n1, *n2;
3424
3425                 n1 = &nodemap1->node[i];
3426                 n2 = &nodemap2->node[i];
3427
3428                 if ((n1->pnn != n2->pnn) ||
3429                     (n1->flags != n2->flags) ||
3430                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3431                         return false;
3432                 }
3433         }
3434
3435         return true;
3436 }
3437
3438 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3439                                    struct ctdb_node_map *nm,
3440                                    struct ctdb_node_map *fnm,
3441                                    bool *reload)
3442 {
3443         int i;
3444         bool check_failed = false;
3445
3446         *reload = false;
3447
3448         for (i=0; i<nm->num; i++) {
3449                 if (i >= fnm->num) {
3450                         fprintf(stderr,
3451                                 "Node %u (%s) missing from nodes file\n",
3452                                 nm->node[i].pnn,
3453                                 ctdb_sock_addr_to_string(
3454                                         mem_ctx, &nm->node[i].addr));
3455                         check_failed = true;
3456                         continue;
3457                 }
3458                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3459                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3460                         /* Node remains deleted */
3461                         continue;
3462                 }
3463
3464                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3465                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3466                         /* Node not newly nor previously deleted */
3467                         if (! ctdb_same_ip(&nm->node[i].addr,
3468                                            &fnm->node[i].addr)) {
3469                                 fprintf(stderr,
3470                                         "Node %u has changed IP address"
3471                                         " (was %s, now %s)\n",
3472                                         nm->node[i].pnn,
3473                                         ctdb_sock_addr_to_string(
3474                                                 mem_ctx, &nm->node[i].addr),
3475                                         ctdb_sock_addr_to_string(
3476                                                 mem_ctx, &fnm->node[i].addr));
3477                                 check_failed = true;
3478                         } else {
3479                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3480                                         fprintf(stderr,
3481                                                 "WARNING: Node %u is disconnected."
3482                                                 " You MUST fix this node manually!\n",
3483                                                 nm->node[i].pnn);
3484                                 }
3485                         }
3486                         continue;
3487                 }
3488
3489                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3490                         /* Node is being deleted */
3491                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3492                         *reload = true;
3493                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3494                                 fprintf(stderr,
3495                                         "ERROR: Node %u is still connected\n",
3496                                         nm->node[i].pnn);
3497                                 check_failed = true;
3498                         }
3499                         continue;
3500                 }
3501
3502                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3503                         /* Node was previously deleted */
3504                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3505                         *reload = true;
3506                 }
3507         }
3508
3509         if (check_failed) {
3510                 fprintf(stderr,
3511                         "ERROR: Nodes will not be reloaded due to previous error\n");
3512                 return 1;
3513         }
3514
3515         /* Leftover nodes in file are NEW */
3516         for (; i < fnm->num; i++) {
3517                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3518                 *reload = true;
3519         }
3520
3521         return 0;
3522 }
3523
3524 struct disable_recoveries_state {
3525         uint32_t *pnn_list;
3526         int node_count;
3527         bool *reply;
3528         int status;
3529         bool done;
3530 };
3531
3532 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3533                                        void *private_data)
3534 {
3535         struct disable_recoveries_state *state =
3536                 (struct disable_recoveries_state *)private_data;
3537         int ret, i;
3538
3539         if (data.dsize != sizeof(int)) {
3540                 /* Ignore packet */
3541                 return;
3542         }
3543
3544         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3545         ret = *(int *)data.dptr;
3546         if (ret < 0) {
3547                 state->status = ret;
3548                 state->done = true;
3549                 return;
3550         }
3551         for (i=0; i<state->node_count; i++) {
3552                 if (state->pnn_list[i] == ret) {
3553                         state->reply[i] = true;
3554                         break;
3555                 }
3556         }
3557
3558         state->done = true;
3559         for (i=0; i<state->node_count; i++) {
3560                 if (! state->reply[i]) {
3561                         state->done = false;
3562                         break;
3563                 }
3564         }
3565 }
3566
3567 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3568                               uint32_t timeout, uint32_t *pnn_list, int count)
3569 {
3570         struct ctdb_disable_message disable = { 0 };
3571         struct disable_recoveries_state state;
3572         int ret, i;
3573
3574         disable.pnn = ctdb->pnn;
3575         disable.srvid = next_srvid(ctdb);
3576         disable.timeout = timeout;
3577
3578         state.pnn_list = pnn_list;
3579         state.node_count = count;
3580         state.done = false;
3581         state.status = 0;
3582         state.reply = talloc_zero_array(mem_ctx, bool, count);
3583         if (state.reply == NULL) {
3584                 return ENOMEM;
3585         }
3586
3587         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3588                                               disable.srvid,
3589                                               disable_recoveries_handler,
3590                                               &state);
3591         if (ret != 0) {
3592                 return ret;
3593         }
3594
3595         for (i=0; i<count; i++) {
3596                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3597                                                       ctdb->client,
3598                                                       pnn_list[i],
3599                                                       &disable);
3600                 if (ret != 0) {
3601                         goto fail;
3602                 }
3603         }
3604
3605         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3606         if (ret == ETIME) {
3607                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3608         } else {
3609                 ret = (state.status >= 0 ? 0 : 1);
3610         }
3611
3612 fail:
3613         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3614                                            disable.srvid, &state);
3615         return ret;
3616 }
3617
3618 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3619                                int argc, const char **argv)
3620 {
3621         struct ctdb_node_map *nodemap = NULL;
3622         struct ctdb_node_map *file_nodemap;
3623         struct ctdb_node_map *remote_nodemap;
3624         struct ctdb_req_control request;
3625         struct ctdb_reply_control **reply;
3626         bool reload;
3627         int ret, i;
3628         uint32_t *pnn_list;
3629         int count;
3630
3631         nodemap = get_nodemap(ctdb, false);
3632         if (nodemap == NULL) {
3633                 return 1;
3634         }
3635
3636         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3637         if (file_nodemap == NULL) {
3638                 return 1;
3639         }
3640
3641         for (i=0; i<nodemap->num; i++) {
3642                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3643                         continue;
3644                 }
3645
3646                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3647                                                nodemap->node[i].pnn, TIMEOUT(),
3648                                                &remote_nodemap);
3649                 if (ret != 0) {
3650                         fprintf(stderr,
3651                                 "ERROR: Failed to get nodes file from node %u\n",
3652                                 nodemap->node[i].pnn);
3653                         return ret;
3654                 }
3655
3656                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3657                         fprintf(stderr,
3658                                 "ERROR: Nodes file on node %u differs"
3659                                  " from current node (%u)\n",
3660                                  nodemap->node[i].pnn, ctdb->pnn);
3661                         return 1;
3662                 }
3663         }
3664
3665         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3666         if (ret != 0) {
3667                 return ret;
3668         }
3669
3670         if (! reload) {
3671                 fprintf(stderr, "No change in nodes file,"
3672                                 " skipping unnecessary reload\n");
3673                 return 0;
3674         }
3675
3676         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3677                                         mem_ctx, &pnn_list);
3678         if (count <= 0) {
3679                 fprintf(stderr, "Memory allocation error\n");
3680                 return 1;
3681         }
3682
3683         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3684                                  pnn_list, count);
3685         if (ret != 0) {
3686                 fprintf(stderr, "Failed to disable recoveries\n");
3687                 return ret;
3688         }
3689
3690         ctdb_req_control_reload_nodes_file(&request);
3691         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3692                                         pnn_list, count, TIMEOUT(),
3693                                         &request, NULL, &reply);
3694         if (ret != 0) {
3695                 bool failed = false;
3696
3697                 for (i=0; i<count; i++) {
3698                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3699                         if (ret != 0) {
3700                                 fprintf(stderr,
3701                                         "Node %u failed to reload nodes\n",
3702                                         pnn_list[i]);
3703                                 failed = true;
3704                         }
3705                 }
3706                 if (failed) {
3707                         fprintf(stderr,
3708                                 "You MUST fix failed nodes manually!\n");
3709                 }
3710         }
3711
3712         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3713         if (ret != 0) {
3714                 fprintf(stderr, "Failed to enable recoveries\n");
3715                 return ret;
3716         }
3717
3718         return 0;
3719 }
3720
3721 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3722                   ctdb_sock_addr *addr, uint32_t pnn)
3723 {
3724         struct ctdb_public_ip_list *pubip_list;
3725         struct ctdb_public_ip pubip;
3726         struct ctdb_node_map *nodemap;
3727         struct ctdb_req_control request;
3728         uint32_t *pnn_list;
3729         int ret, i, count;
3730
3731         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3732                                             CTDB_BROADCAST_CONNECTED,
3733                                             2*options.timelimit);
3734         if (ret != 0) {
3735                 fprintf(stderr, "Failed to disable IP check\n");
3736                 return ret;
3737         }
3738
3739         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3740                                        pnn, TIMEOUT(), &pubip_list);
3741         if (ret != 0) {
3742                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3743                         pnn);
3744                 return ret;
3745         }
3746
3747         for (i=0; i<pubip_list->num; i++) {
3748                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3749                         break;
3750                 }
3751         }
3752
3753         if (i == pubip_list->num) {
3754                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3755                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr));
3756                 return 1;
3757         }
3758
3759         nodemap = get_nodemap(ctdb, false);
3760         if (nodemap == NULL) {
3761                 return 1;
3762         }
3763
3764         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3765         if (count <= 0) {
3766                 fprintf(stderr, "Memory allocation error\n");
3767                 return 1;
3768         }
3769
3770         pubip.pnn = pnn;
3771         pubip.addr = *addr;
3772         ctdb_req_control_release_ip(&request, &pubip);
3773
3774         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3775                                         pnn_list, count, TIMEOUT(),
3776                                         &request, NULL, NULL);
3777         if (ret != 0) {
3778                 fprintf(stderr, "Failed to release IP on nodes\n");
3779                 return ret;
3780         }
3781
3782         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3783                                     pnn, TIMEOUT(), &pubip);
3784         if (ret != 0) {
3785                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3786                 return ret;
3787         }
3788
3789         return 0;
3790 }
3791
3792 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3793                           int argc, const char **argv)
3794 {
3795         ctdb_sock_addr addr;
3796         uint32_t pnn;
3797         int ret, retries = 0;
3798
3799         if (argc != 2) {
3800                 usage("moveip");
3801         }
3802
3803         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3804                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3805                 return 1;
3806         }
3807
3808         pnn = strtoul(argv[1], NULL, 10);
3809         if (pnn == CTDB_UNKNOWN_PNN) {
3810                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3811                 return 1;
3812         }
3813
3814         while (retries < 5) {
3815                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3816                 if (ret == 0) {
3817                         break;
3818                 }
3819
3820                 sleep(3);
3821                 retries++;
3822         }
3823
3824         if (ret != 0) {
3825                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3826                         argv[0], pnn);
3827                 return ret;
3828         }
3829
3830         return 0;
3831 }
3832
3833 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3834                          uint32_t pnn)
3835 {
3836         int ret;
3837
3838         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3839                                           CTDB_BROADCAST_CONNECTED, pnn);
3840         if (ret != 0) {
3841                 fprintf(stderr,
3842                         "Failed to ask recovery master to distribute IPs\n");
3843                 return ret;
3844         }
3845
3846         return 0;
3847 }
3848
3849 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3850                          int argc, const char **argv)
3851 {
3852         ctdb_sock_addr addr;
3853         struct ctdb_public_ip_list *pubip_list;
3854         struct ctdb_addr_info addr_info;
3855         unsigned int mask;
3856         int ret, i, retries = 0;
3857
3858         if (argc != 2) {
3859                 usage("addip");
3860         }
3861
3862         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3863                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3864                 return 1;
3865         }
3866
3867         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3868                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3869         if (ret != 0) {
3870                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3871                         ctdb->cmd_pnn);
3872                 return 1;
3873         }
3874
3875         for (i=0; i<pubip_list->num; i++) {
3876                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3877                         fprintf(stderr, "Node already knows about IP %s\n",
3878                                 ctdb_sock_addr_to_string(mem_ctx, &addr));
3879                         return 0;
3880                 }
3881         }
3882
3883         addr_info.addr = addr;
3884         addr_info.mask = mask;
3885         addr_info.iface = argv[1];
3886
3887         while (retries < 5) {
3888                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3889                                               ctdb->cmd_pnn, TIMEOUT(),
3890                                               &addr_info);
3891                 if (ret == 0) {
3892                         break;
3893                 }
3894
3895                 sleep(3);
3896                 retries++;
3897         }
3898
3899         if (ret != 0) {
3900                 fprintf(stderr, "Failed to add public IP to node %u."
3901                                 " Giving up\n", ctdb->cmd_pnn);
3902                 return ret;
3903         }
3904
3905         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3906         if (ret != 0) {
3907                 return ret;
3908         }
3909
3910         return 0;
3911 }
3912
3913 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3914                          int argc, const char **argv)
3915 {
3916         ctdb_sock_addr addr;
3917         struct ctdb_public_ip_list *pubip_list;
3918         struct ctdb_addr_info addr_info;
3919         int ret, i;
3920
3921         if (argc != 1) {
3922                 usage("delip");
3923         }
3924
3925         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3926                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3927                 return 1;
3928         }
3929
3930         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3931                                        ctdb->cmd_pnn, TIMEOUT(), &pubip_list);
3932         if (ret != 0) {
3933                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3934                         ctdb->cmd_pnn);
3935                 return 1;
3936         }
3937
3938         for (i=0; i<pubip_list->num; i++) {
3939                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3940                         break;
3941                 }
3942         }
3943
3944         if (i == pubip_list->num) {
3945                 fprintf(stderr, "Node does not know about IP address %s\n",
3946                         ctdb_sock_addr_to_string(mem_ctx, &addr));
3947                 return 0;
3948         }
3949
3950         addr_info.addr = addr;
3951         addr_info.mask = 0;
3952         addr_info.iface = NULL;
3953
3954         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3955                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3956         if (ret != 0) {
3957                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3958                         ctdb->cmd_pnn);
3959                 return ret;
3960         }
3961
3962         return 0;
3963 }
3964
3965 static int control_eventscript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3966                                int argc, const char **argv)
3967 {
3968         int ret;
3969
3970         if (argc != 1) {
3971                 usage("eventscript");
3972         }
3973
3974         if (strcmp(argv[0], "monitor") != 0) {
3975                 fprintf(stderr, "Only monitor event can be run\n");
3976                 return 1;
3977         }
3978
3979         ret = ctdb_ctrl_run_eventscripts(mem_ctx, ctdb->ev, ctdb->client,
3980                                          ctdb->cmd_pnn, TIMEOUT(), argv[0]);
3981         if (ret != 0) {
3982                 fprintf(stderr, "Failed to run monitor event on node %u\n",
3983                         ctdb->cmd_pnn);
3984                 return ret;
3985         }
3986
3987         return 0;
3988 }
3989
3990 #define DB_VERSION      3
3991 #define MAX_DB_NAME     64
3992 #define MAX_REC_BUFFER_SIZE     (100*1000)
3993
3994 struct db_header {
3995         unsigned long version;
3996         time_t timestamp;
3997         unsigned long flags;
3998         unsigned long nbuf;
3999         unsigned long nrec;
4000         char name[MAX_DB_NAME];
4001 };
4002
4003 struct backup_state {
4004         TALLOC_CTX *mem_ctx;
4005         struct ctdb_rec_buffer *recbuf;
4006         uint32_t db_id;
4007         int fd;
4008         unsigned int nbuf, nrec;
4009 };
4010
4011 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
4012                           TDB_DATA key, TDB_DATA data, void *private_data)
4013 {
4014         struct backup_state *state = (struct backup_state *)private_data;
4015         size_t len;
4016         int ret;
4017
4018         if (state->recbuf == NULL) {
4019                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4020                                                      state->db_id);
4021                 if (state->recbuf == NULL) {
4022                         return ENOMEM;
4023                 }
4024         }
4025
4026         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4027                                   header, key, data);
4028         if (ret != 0) {
4029                 return ret;
4030         }
4031
4032         len = ctdb_rec_buffer_len(state->recbuf);
4033         if (len < MAX_REC_BUFFER_SIZE) {
4034                 return 0;
4035         }
4036
4037         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4038         if (ret != 0) {
4039                 fprintf(stderr, "Failed to write records to backup file\n");
4040                 return ret;
4041         }
4042
4043         state->nbuf += 1;
4044         state->nrec += state->recbuf->count;
4045         TALLOC_FREE(state->recbuf);
4046
4047         return 0;
4048 }
4049
4050 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4051                             int argc, const char **argv)
4052 {
4053         const char *db_name;
4054         struct ctdb_db_context *db;
4055         uint32_t db_id;
4056         uint8_t db_flags;
4057         struct backup_state state;
4058         struct db_header db_hdr;
4059         int fd, ret;
4060
4061         if (argc != 2) {
4062                 usage("backupdb");
4063         }
4064
4065         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4066                 return 1;
4067         }
4068
4069         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4070                           db_flags, &db);
4071         if (ret != 0) {
4072                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4073                 return ret;
4074         }
4075
4076         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4077         if (fd == -1) {
4078                 ret = errno;
4079                 fprintf(stderr, "Failed to open file %s for writing\n",
4080                         argv[1]);
4081                 return ret;
4082         }
4083
4084         /* Write empty header first */
4085         ZERO_STRUCT(db_hdr);
4086         ret = write(fd, &db_hdr, sizeof(struct db_header));
4087         if (ret == -1) {
4088                 ret = errno;
4089                 close(fd);
4090                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4091                 return ret;
4092         }
4093
4094         state.mem_ctx = mem_ctx;
4095         state.recbuf = NULL;
4096         state.fd = fd;
4097         state.nbuf = 0;
4098         state.nrec = 0;
4099
4100         ret = ctdb_db_traverse(db, true, false, backup_handler, &state);
4101         if (ret != 0) {
4102                 fprintf(stderr, "Failed to collect records from DB %s\n",
4103                         db_name);
4104                 close(fd);
4105                 return ret;
4106         }
4107
4108         if (state.recbuf != NULL) {
4109                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4110                 if (ret != 0) {
4111                         fprintf(stderr,
4112                                 "Failed to write records to backup file\n");
4113                         close(fd);
4114                         return ret;
4115                 }
4116
4117                 state.nbuf += 1;
4118                 state.nrec += state.recbuf->count;
4119                 TALLOC_FREE(state.recbuf);
4120         }
4121
4122         db_hdr.version = DB_VERSION;
4123         db_hdr.timestamp = time(NULL);
4124         db_hdr.flags = db_flags;
4125         db_hdr.nbuf = state.nbuf;
4126         db_hdr.nrec = state.nrec;
4127         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4128
4129         lseek(fd, 0, SEEK_SET);
4130         ret = write(fd, &db_hdr, sizeof(struct db_header));
4131         if (ret == -1) {
4132                 ret = errno;
4133                 close(fd);
4134                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4135                 return ret;
4136         }
4137
4138         close(fd);
4139         printf("Database backed up to %s\n", argv[1]);
4140         return 0;
4141 }
4142
4143 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4144                              int argc, const char **argv)
4145 {
4146         const char *db_name = NULL;
4147         struct ctdb_db_context *db;
4148         struct db_header db_hdr;
4149         struct ctdb_node_map *nodemap;
4150         struct ctdb_req_control request;
4151         struct ctdb_reply_control **reply;
4152         struct ctdb_transdb wipedb;
4153         struct ctdb_pulldb_ext pulldb;
4154         struct ctdb_rec_buffer *recbuf;
4155         uint32_t generation;
4156         uint32_t *pnn_list;
4157         char timebuf[128];
4158         int fd, i;
4159         int count, ret;
4160
4161         if (argc < 1 || argc > 2) {
4162                 usage("restoredb");
4163         }
4164
4165         fd = open(argv[0], O_RDONLY, 0600);
4166         if (fd == -1) {
4167                 ret = errno;
4168                 fprintf(stderr, "Failed to open file %s for reading\n",
4169                         argv[0]);
4170                 return ret;
4171         }
4172
4173         if (argc == 2) {
4174                 db_name = argv[1];
4175         }
4176
4177         ret = read(fd, &db_hdr, sizeof(struct db_header));
4178         if (ret == -1) {
4179                 ret = errno;
4180                 close(fd);
4181                 fprintf(stderr, "Failed to read db header from file %s\n",
4182                         argv[0]);
4183                 return ret;
4184         }
4185
4186         if (db_hdr.version != DB_VERSION) {
4187                 fprintf(stderr,
4188                         "Wrong version of backup file, expected %u, got %lu\n",
4189                         DB_VERSION, db_hdr.version);
4190                 close(fd);
4191                 return EINVAL;
4192         }
4193
4194         if (db_name == NULL) {
4195                 db_name = db_hdr.name;
4196         }
4197
4198         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4199                  localtime(&db_hdr.timestamp));
4200         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4201
4202         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4203                           db_hdr.flags, &db);
4204         if (ret != 0) {
4205                 fprintf(stderr, "Failed to attach to DB %s\n", db_hdr.name);
4206                 return ret;
4207         }
4208
4209         nodemap = get_nodemap(ctdb, false);
4210         if (nodemap == NULL) {
4211                 fprintf(stderr, "Failed to get nodemap\n");
4212                 close(fd);
4213                 return ENOMEM;
4214         }
4215
4216         ret = get_generation(mem_ctx, ctdb, &generation);
4217         if (ret != 0) {
4218                 fprintf(stderr, "Failed to get current generation\n");
4219                 close(fd);
4220                 return ret;
4221         }
4222
4223         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4224                                      &pnn_list);
4225         if (count <= 0) {
4226                 close(fd);
4227                 return ENOMEM;
4228         }
4229
4230         wipedb.db_id = ctdb_db_id(db);
4231         wipedb.tid = generation;
4232
4233         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4234         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4235                                         ctdb->client, pnn_list, count,
4236                                         TIMEOUT(), &request, NULL, NULL);
4237         if (ret != 0) {
4238                 goto failed;
4239         }
4240
4241
4242         ctdb_req_control_db_transaction_start(&request, &wipedb);
4243         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4244                                         pnn_list, count, TIMEOUT(),
4245                                         &request, NULL, NULL);
4246         if (ret != 0) {
4247                 goto failed;
4248         }
4249
4250         ctdb_req_control_wipe_database(&request, &wipedb);
4251         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4252                                         pnn_list, count, TIMEOUT(),
4253                                         &request, NULL, NULL);
4254         if (ret != 0) {
4255                 goto failed;
4256         }
4257
4258         pulldb.db_id = ctdb_db_id(db);
4259         pulldb.lmaster = 0;
4260         pulldb.srvid = SRVID_CTDB_PUSHDB;
4261
4262         ctdb_req_control_db_push_start(&request, &pulldb);
4263         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4264                                         pnn_list, count, TIMEOUT(),
4265                                         &request, NULL, NULL);
4266         if (ret != 0) {
4267                 goto failed;
4268         }
4269
4270         for (i=0; i<db_hdr.nbuf; i++) {
4271                 struct ctdb_req_message message;
4272                 TDB_DATA data;
4273
4274                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4275                 if (ret != 0) {
4276                         goto failed;
4277                 }
4278
4279                 data.dsize = ctdb_rec_buffer_len(recbuf);
4280                 data.dptr = talloc_size(mem_ctx, data.dsize);
4281                 if (data.dptr == NULL) {
4282                         goto failed;
4283                 }
4284
4285                 ctdb_rec_buffer_push(recbuf, data.dptr);
4286
4287                 message.srvid = pulldb.srvid;
4288                 message.data.data = data;
4289
4290                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4291                                                 ctdb->client,
4292                                                 pnn_list, count,
4293                                                 &message, NULL);
4294                 if (ret != 0) {
4295                         goto failed;
4296                 }
4297
4298                 talloc_free(recbuf);
4299                 talloc_free(data.dptr);
4300         }
4301
4302         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4303         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4304                                         pnn_list, count, TIMEOUT(),
4305                                         &request, NULL, &reply);
4306         if (ret != 0) {
4307                 goto failed;
4308         }
4309
4310         for (i=0; i<count; i++) {
4311                 uint32_t num_records;
4312
4313                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4314                                                          &num_records);
4315                 if (ret != 0) {
4316                         fprintf(stderr, "Invalid response from node %u\n",
4317                                 pnn_list[i]);
4318                         goto failed;
4319                 }
4320
4321                 if (num_records != db_hdr.nrec) {
4322                         fprintf(stderr, "Node %u received %u of %lu records\n",
4323                                 pnn_list[i], num_records, db_hdr.nrec);
4324                         goto failed;
4325                 }
4326         }
4327
4328         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4329         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4330                                         pnn_list, count, TIMEOUT(),
4331                                         &request, NULL, NULL);
4332         if (ret != 0) {
4333                 goto failed;
4334         }
4335
4336         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4337         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4338                                         pnn_list, count, TIMEOUT(),
4339                                         &request, NULL, NULL);
4340         if (ret != 0) {
4341                 goto failed;
4342         }
4343
4344         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4345         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4346                                         ctdb->client, pnn_list, count,
4347                                         TIMEOUT(), &request, NULL, NULL);
4348         if (ret != 0) {
4349                 goto failed;
4350         }
4351
4352         printf("Database %s restored\n", db_name);
4353         return 0;
4354
4355
4356 failed:
4357         close(fd);
4358         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4359                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4360         return ret;
4361 }
4362
4363 struct dumpdbbackup_state {
4364         ctdb_rec_parser_func_t parser;
4365         struct dump_record_state sub_state;
4366 };
4367
4368 static int dumpdbbackup_handler(uint32_t reqid,
4369                                 struct ctdb_ltdb_header *header,
4370                                 TDB_DATA key, TDB_DATA data,
4371                                 void *private_data)
4372 {
4373         struct dumpdbbackup_state *state =
4374                 (struct dumpdbbackup_state *)private_data;
4375         struct ctdb_ltdb_header hdr;
4376         int ret;
4377
4378         ret = ctdb_ltdb_header_extract(&data, &hdr);
4379         if (ret != 0) {
4380                 return ret;
4381         }
4382
4383         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4384 }
4385
4386 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4387                                 int argc, const char **argv)
4388 {
4389         struct db_header db_hdr;
4390         char timebuf[128];
4391         struct dumpdbbackup_state state;
4392         int fd, ret, i;
4393
4394         if (argc != 1) {
4395                 usage("dumpbackup");
4396         }
4397
4398         fd = open(argv[0], O_RDONLY, 0600);
4399         if (fd == -1) {
4400                 ret = errno;
4401                 fprintf(stderr, "Failed to open file %s for reading\n",
4402                         argv[0]);
4403                 return ret;
4404         }
4405
4406         ret = read(fd, &db_hdr, sizeof(struct db_header));
4407         if (ret == -1) {
4408                 ret = errno;
4409                 close(fd);
4410                 fprintf(stderr, "Failed to read db header from file %s\n",
4411                         argv[0]);
4412                 return ret;
4413         }
4414
4415         if (db_hdr.version != DB_VERSION) {
4416                 fprintf(stderr,
4417                         "Wrong version of backup file, expected %u, got %lu\n",
4418                         DB_VERSION, db_hdr.version);
4419                 close(fd);
4420                 return EINVAL;
4421         }
4422
4423         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4424                  localtime(&db_hdr.timestamp));
4425         printf("Dumping database %s from backup @ %s\n",
4426                db_hdr.name, timebuf);
4427
4428         state.parser = dump_record;
4429         state.sub_state.count = 0;
4430
4431         for (i=0; i<db_hdr.nbuf; i++) {
4432                 struct ctdb_rec_buffer *recbuf;
4433
4434                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4435                 if (ret != 0) {
4436                         fprintf(stderr, "Failed to read records\n");
4437                         close(fd);
4438                         return ret;
4439                 }
4440
4441                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4442                                                &state);
4443                 if (ret != 0) {
4444                         fprintf(stderr, "Failed to dump records\n");
4445                         close(fd);
4446                         return ret;
4447                 }
4448         }
4449
4450         close(fd);
4451         printf("Dumped %u record(s)\n", state.sub_state.count);
4452         return 0;
4453 }
4454
4455 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4456                           int argc, const char **argv)
4457 {
4458         const char *db_name;
4459         struct ctdb_db_context *db;
4460         uint32_t db_id;
4461         uint8_t db_flags;
4462         struct ctdb_node_map *nodemap;
4463         struct ctdb_req_control request;
4464         struct ctdb_transdb wipedb;
4465         uint32_t generation;
4466         uint32_t *pnn_list;
4467         int count, ret;
4468
4469         if (argc != 1) {
4470                 usage("wipedb");
4471         }
4472
4473         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4474                 return 1;
4475         }
4476
4477         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4478                           db_flags, &db);
4479         if (ret != 0) {
4480                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4481                 return ret;
4482         }
4483
4484         nodemap = get_nodemap(ctdb, false);
4485         if (nodemap == NULL) {
4486                 fprintf(stderr, "Failed to get nodemap\n");
4487                 return ENOMEM;
4488         }
4489
4490         ret = get_generation(mem_ctx, ctdb, &generation);
4491         if (ret != 0) {
4492                 fprintf(stderr, "Failed to get current generation\n");
4493                 return ret;
4494         }
4495
4496         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4497                                      &pnn_list);
4498         if (count <= 0) {
4499                 return ENOMEM;
4500         }
4501
4502         ctdb_req_control_db_freeze(&request, db_id);
4503         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4504                                         ctdb->client, pnn_list, count,
4505                                         TIMEOUT(), &request, NULL, NULL);
4506         if (ret != 0) {
4507                 goto failed;
4508         }
4509
4510         wipedb.db_id = db_id;
4511         wipedb.tid = generation;
4512
4513         ctdb_req_control_db_transaction_start(&request, &wipedb);
4514         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4515                                         pnn_list, count, TIMEOUT(),
4516                                         &request, NULL, NULL);
4517         if (ret != 0) {
4518                 goto failed;
4519         }
4520
4521         ctdb_req_control_wipe_database(&request, &wipedb);
4522         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4523                                         pnn_list, count, TIMEOUT(),
4524                                         &request, NULL, NULL);
4525         if (ret != 0) {
4526                 goto failed;
4527         }
4528
4529         ctdb_req_control_db_set_healthy(&request, db_id);
4530         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4531                                         pnn_list, count, TIMEOUT(),
4532                                         &request, NULL, NULL);
4533         if (ret != 0) {
4534                 goto failed;
4535         }
4536
4537         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4538         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4539                                         pnn_list, count, TIMEOUT(),
4540                                         &request, NULL, NULL);
4541         if (ret != 0) {
4542                 goto failed;
4543         }
4544
4545         ctdb_req_control_db_thaw(&request, db_id);
4546         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4547                                         ctdb->client, pnn_list, count,
4548                                         TIMEOUT(), &request, NULL, NULL);
4549         if (ret != 0) {
4550                 goto failed;
4551         }
4552
4553         printf("Database %s wiped\n", db_name);
4554         return 0;
4555
4556
4557 failed:
4558         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4559                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4560         return ret;
4561 }
4562
4563 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4564                              int argc, const char **argv)
4565 {
4566         uint32_t recmaster;
4567         int ret;
4568
4569         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4570                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4571         if (ret != 0) {
4572                 return ret;
4573         }
4574
4575         printf("%u\n", recmaster);
4576         return 0;
4577 }
4578
4579 static void print_scriptstatus_one(struct ctdb_script_list *slist,
4580                                    const char *event_str)
4581 {
4582         int i;
4583         int num_run = 0;
4584
4585         if (slist == NULL) {
4586                 if (! options.machinereadable) {
4587                         printf("%s cycle never run\n", event_str);
4588                 }
4589                 return;
4590         }
4591
4592         for (i=0; i<slist->num_scripts; i++) {
4593                 if (slist->script[i].status != -ENOEXEC) {
4594                         num_run++;
4595                 }
4596         }
4597
4598         if (! options.machinereadable) {
4599                 printf("%d scripts were executed last %s cycle\n",
4600                        num_run, event_str);
4601         }
4602
4603         for (i=0; i<slist->num_scripts; i++) {
4604                 const char *status = NULL;
4605
4606                 switch (slist->script[i].status) {
4607                 case -ETIME:
4608                         status = "TIMEDOUT";
4609                         break;
4610                 case -ENOEXEC:
4611                         status = "DISABLED";
4612                         break;
4613                 case 0:
4614                         status = "OK";
4615                         break;
4616                 default:
4617                         if (slist->script[i].status > 0) {
4618                                 status = "ERROR";
4619                         }
4620                         break;
4621                 }
4622
4623                 if (options.machinereadable) {
4624                         printf("%s%s%s%s%s%i%s%s%s%lu.%06lu%s%lu.%06lu%s%s%s\n",
4625                                options.sep,
4626                                event_str, options.sep,
4627                                slist->script[i].name, options.sep,
4628                                slist->script[i].status, options.sep,
4629                                status, options.sep,
4630                                slist->script[i].start.tv_sec,
4631                                slist->script[i].start.tv_usec, options.sep,
4632                                slist->script[i].finished.tv_sec,
4633                                slist->script[i].finished.tv_usec, options.sep,
4634                                slist->script[i].output, options.sep);
4635                         continue;
4636                 }
4637
4638                 if (status) {
4639                         printf("%-20s Status:%s    ",
4640                                slist->script[i].name, status);
4641                 } else {
4642                         /* Some other error, eg from stat. */
4643                         printf("%-20s Status:CANNOT RUN (%s)",
4644                                slist->script[i].name,
4645                                strerror(-slist->script[i].status));
4646                 }
4647
4648                 if (slist->script[i].status >= 0) {
4649                         printf("Duration:%.3lf ",
4650                                timeval_delta(&slist->script[i].finished,
4651                                              &slist->script[i].start));
4652                 }
4653                 if (slist->script[i].status != -ENOEXEC) {
4654                         printf("%s", ctime(&slist->script[i].start.tv_sec));
4655                         if (slist->script[i].status != 0) {
4656                                 printf("   OUTPUT:%s\n",
4657                                        slist->script[i].output);
4658                         }
4659                 } else {
4660                         printf("\n");
4661                 }
4662         }
4663 }
4664
4665 static void print_scriptstatus(struct ctdb_script_list **slist,
4666                                int count, const char **event_str)
4667 {
4668         int i;
4669
4670         if (options.machinereadable) {
4671                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
4672                        options.sep,
4673                        "Type", options.sep,
4674                        "Name", options.sep,
4675                        "Code", options.sep,
4676                        "Status", options.sep,
4677                        "Start", options.sep,
4678                        "End", options.sep,
4679                        "Error Output", options.sep);
4680         }
4681
4682         for (i=0; i<count; i++) {
4683                 print_scriptstatus_one(slist[i], event_str[i]);
4684         }
4685 }
4686
4687 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4688                                 int argc, const char **argv)
4689 {
4690         struct ctdb_script_list **slist;
4691         const char *event_str;
4692         enum ctdb_event event;
4693         const char *all_events[] = {
4694                 "init", "setup", "startup", "monitor",
4695                 "takeip", "releaseip", "updateip", "ipreallocated" };
4696         bool valid;
4697         int ret, i, j;
4698         int count, start, end, num;
4699
4700         if (argc > 1) {
4701                 usage("scriptstatus");
4702         }
4703
4704         if (argc == 0) {
4705                 event_str = "monitor";
4706         } else {
4707                 event_str = argv[0];
4708         }
4709
4710         valid = false;
4711         end = 0;
4712
4713         for (i=0; i<ARRAY_SIZE(all_events); i++) {
4714                 if (strcmp(event_str, all_events[i]) == 0) {
4715                         valid = true;
4716                         count = 1;
4717                         start = i;
4718                         end = i+1;
4719                         break;
4720                 }
4721         }
4722
4723         if (strcmp(event_str, "all") == 0) {
4724                 valid = true;
4725                 count = ARRAY_SIZE(all_events);
4726                 start = 0;
4727                 end = count-1;
4728         }
4729
4730         if (! valid) {
4731                 fprintf(stderr, "Unknown event name %s\n", argv[0]);
4732                 usage("scriptstatus");
4733         }
4734
4735         slist = talloc_array(mem_ctx, struct ctdb_script_list *, count);
4736         if (slist == NULL) {
4737                 fprintf(stderr, "Memory allocation error\n");
4738                 return 1;
4739         }
4740
4741         num = 0;
4742         for (i=start; i<end; i++) {
4743                 event = ctdb_event_from_string(all_events[i]);
4744
4745                 ret = ctdb_ctrl_get_event_script_status(mem_ctx, ctdb->ev,
4746                                                         ctdb->client,
4747                                                         ctdb->cmd_pnn,
4748                                                         TIMEOUT(), event,
4749                                                         &slist[num]);
4750                 if (ret != 0) {
4751                         fprintf(stderr,
4752                                 "failed to get script status for %s event\n",
4753                                 all_events[i]);
4754                         return 1;
4755                 }
4756
4757                 if (slist[num] == NULL) {
4758                         num++;
4759                         continue;
4760                 }
4761
4762                 /* The ETIME status is ignored for certain events.
4763                  * In that case the status is 0, but endtime is not set.
4764                  */
4765                 for (j=0; j<slist[num]->num_scripts; j++) {
4766                         if (slist[num]->script[j].status == 0 &&
4767                             timeval_is_zero(&slist[num]->script[j].finished)) {
4768                                 slist[num]->script[j].status = -ETIME;
4769                         }
4770                 }
4771
4772                 num++;
4773         }
4774
4775         print_scriptstatus(slist, count, &all_events[start]);
4776         return 0;
4777 }
4778
4779 static int control_enablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4780                                 int argc, const char **argv)
4781 {
4782         int ret;
4783
4784         if (argc != 1) {
4785                 usage("enablescript");
4786         }
4787
4788         ret = ctdb_ctrl_enable_script(mem_ctx, ctdb->ev, ctdb->client,
4789                                       ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4790         if (ret != 0) {
4791                 fprintf(stderr, "Failed to enable script %s\n", argv[0]);
4792                 return ret;
4793         }
4794
4795         return 0;
4796 }
4797
4798 static int control_disablescript(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4799                                  int argc, const char **argv)
4800 {
4801         int ret;
4802
4803         if (argc != 1) {
4804                 usage("disablescript");
4805         }
4806
4807         ret = ctdb_ctrl_disable_script(mem_ctx, ctdb->ev, ctdb->client,
4808                                        ctdb->cmd_pnn, TIMEOUT(), argv[0]);
4809         if (ret != 0) {
4810                 fprintf(stderr, "Failed to disable script %s\n", argv[0]);
4811                 return ret;
4812         }
4813
4814         return 0;
4815 }
4816
4817 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4818                          int argc, const char **argv)
4819 {
4820         char *t, *natgw_helper = NULL;
4821
4822         if (argc != 1) {
4823                 usage("natgw");
4824         }
4825
4826         t = getenv("CTDB_NATGW_HELPER");
4827         if (t != NULL) {
4828                 natgw_helper = talloc_strdup(mem_ctx, t);
4829         } else {
4830                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4831                                                CTDB_HELPER_BINDIR);
4832         }
4833
4834         if (natgw_helper == NULL) {
4835                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4836                 return 1;
4837         }
4838
4839         return run_helper("NAT gateway helper", natgw_helper, argv[0]);
4840 }
4841
4842 static int control_natgwlist(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4843                              int argc, const char **argv)
4844 {
4845         char *t, *natgw_helper = NULL;
4846
4847         if (argc != 0) {
4848                 usage("natgwlist");
4849         }
4850
4851         t = getenv("CTDB_NATGW_HELPER");
4852         if (t != NULL) {
4853                 natgw_helper = talloc_strdup(mem_ctx, t);
4854         } else {
4855                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4856                                                CTDB_HELPER_BINDIR);
4857         }
4858
4859         if (natgw_helper == NULL) {
4860                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4861                 return 1;
4862         }
4863
4864         return run_helper("NAT gateway helper", natgw_helper, "natgwlist");
4865 }
4866
4867 /*
4868  * Find the PNN of the current node
4869  * discover the pnn by loading the nodes file and try to bind
4870  * to all addresses one at a time until the ip address is found.
4871  */
4872 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4873 {
4874         struct ctdb_node_map *nodemap;
4875         int i;
4876
4877         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4878         if (nodemap == NULL) {
4879                 return false;
4880         }
4881
4882         for (i=0; i<nodemap->num; i++) {
4883                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4884                         continue;
4885                 }
4886                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4887                         if (pnn != NULL) {
4888                                 *pnn = nodemap->node[i].pnn;
4889                         }
4890                         talloc_free(nodemap);
4891                         return true;
4892                 }
4893         }
4894
4895         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4896         talloc_free(nodemap);
4897         return false;
4898 }
4899
4900 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4901                               int argc, const char **argv)
4902 {
4903         const char *reclock;
4904         int ret;
4905
4906         if (argc != 0) {
4907                 usage("getreclock");
4908         }
4909
4910         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4911                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4912         if (ret != 0) {
4913                 return ret;
4914         }
4915
4916         if (reclock != NULL) {
4917                 printf("%s\n", reclock);
4918         }
4919
4920         return 0;
4921 }
4922
4923 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4924                                   struct ctdb_context *ctdb,
4925                                   int argc, const char **argv)
4926 {
4927         uint32_t lmasterrole = 0;
4928         int ret;
4929
4930         if (argc != 1) {
4931                 usage("setlmasterrole");
4932         }
4933
4934         if (strcmp(argv[0], "on") == 0) {
4935                 lmasterrole = 1;
4936         } else if (strcmp(argv[0], "off") == 0) {
4937                 lmasterrole = 0;
4938         } else {
4939                 usage("setlmasterrole");
4940         }
4941
4942         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4943                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4944         if (ret != 0) {
4945                 return ret;
4946         }
4947
4948         return 0;
4949 }
4950
4951 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4952                                     struct ctdb_context *ctdb,
4953                                     int argc, const char **argv)
4954 {
4955         uint32_t recmasterrole = 0;
4956         int ret;
4957
4958         if (argc != 1) {
4959                 usage("setrecmasterrole");
4960         }
4961
4962         if (strcmp(argv[0], "on") == 0) {
4963                 recmasterrole = 1;
4964         } else if (strcmp(argv[0], "off") == 0) {
4965                 recmasterrole = 0;
4966         } else {
4967                 usage("setrecmasterrole");
4968         }
4969
4970         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4971                                           ctdb->cmd_pnn, TIMEOUT(),
4972                                           recmasterrole);
4973         if (ret != 0) {
4974                 return ret;
4975         }
4976
4977         return 0;
4978 }
4979
4980 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4981                                  struct ctdb_context *ctdb,
4982                                  int argc, const char **argv)
4983 {
4984         uint32_t db_id;
4985         uint8_t db_flags;
4986         int ret;
4987
4988         if (argc != 1) {
4989                 usage("setdbreadonly");
4990         }
4991
4992         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4993                 return 1;
4994         }
4995
4996         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
4997                 fprintf(stderr, "Cannot set READONLY on persistent DB\n");
4998                 return 1;
4999         }
5000
5001         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
5002                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
5003         if (ret != 0) {
5004                 return ret;
5005         }
5006
5007         return 0;
5008 }
5009
5010 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5011                                int argc, const char **argv)
5012 {
5013         uint32_t db_id;
5014         uint8_t db_flags;
5015         int ret;
5016
5017         if (argc != 1) {
5018                 usage("setdbsticky");
5019         }
5020
5021         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
5022                 return 1;
5023         }
5024
5025         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5026                 fprintf(stderr, "Cannot set STICKY on persistent DB\n");
5027                 return 1;
5028         }
5029
5030         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
5031                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
5032         if (ret != 0) {
5033                 return ret;
5034         }
5035
5036         return 0;
5037 }
5038
5039 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5040                           int argc, const char **argv)
5041 {
5042         const char *db_name;
5043         struct ctdb_db_context *db;
5044         struct ctdb_transaction_handle *h;
5045         uint8_t db_flags;
5046         TDB_DATA key, data;
5047         int ret;
5048
5049         if (argc < 2 || argc > 3) {
5050                 usage("pfetch");
5051         }
5052
5053         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5054                 return 1;
5055         }
5056
5057         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5058                 fprintf(stderr, "DB %s is not a persistent database\n",
5059                         db_name);
5060                 return 1;
5061         }
5062
5063         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5064                           db_flags, &db);
5065         if (ret != 0) {
5066                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5067                 return ret;
5068         }
5069
5070         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5071         if (ret != 0) {
5072                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5073                 return ret;
5074         }
5075
5076         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5077                                      TIMEOUT(), db, true, &h);
5078         if (ret != 0) {
5079                 fprintf(stderr, "Failed to start transaction on db %s\n",
5080                         db_name);
5081                 return ret;
5082         }
5083
5084         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
5085         if (ret != 0) {
5086                 fprintf(stderr, "Failed to read record for key %s\n",
5087                         argv[1]);
5088                 ctdb_transaction_cancel(h);
5089                 return ret;
5090         }
5091
5092         printf("%.*s\n", (int)data.dsize, data.dptr);
5093
5094         ctdb_transaction_cancel(h);
5095         return 0;
5096 }
5097
5098 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5099                           int argc, const char **argv)
5100 {
5101         const char *db_name;
5102         struct ctdb_db_context *db;
5103         struct ctdb_transaction_handle *h;
5104         uint8_t db_flags;
5105         TDB_DATA key, data;
5106         int ret;
5107
5108         if (argc != 3) {
5109                 usage("pstore");
5110         }
5111
5112         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5113                 return 1;
5114         }
5115
5116         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5117                 fprintf(stderr, "DB %s is not a persistent database\n",
5118                         db_name);
5119                 return 1;
5120         }
5121
5122         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5123                           db_flags, &db);
5124         if (ret != 0) {
5125                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5126                 return ret;
5127         }
5128
5129         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5130         if (ret != 0) {
5131                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5132                 return ret;
5133         }
5134
5135         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5136         if (ret != 0) {
5137                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5138                 return ret;
5139         }
5140
5141         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5142                                      TIMEOUT(), db, false, &h);
5143         if (ret != 0) {
5144                 fprintf(stderr, "Failed to start transaction on db %s\n",
5145                         db_name);
5146                 return ret;
5147         }
5148
5149         ret = ctdb_transaction_store_record(h, key, data);
5150         if (ret != 0) {
5151                 fprintf(stderr, "Failed to store record for key %s\n",
5152                         argv[1]);
5153                 ctdb_transaction_cancel(h);
5154                 return ret;
5155         }
5156
5157         ret = ctdb_transaction_commit(h);
5158         if (ret != 0) {
5159                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5160                         db_name);
5161                 ctdb_transaction_cancel(h);
5162                 return ret;
5163         }
5164
5165         return 0;
5166 }
5167
5168 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5169                            int argc, const char **argv)
5170 {
5171         const char *db_name;
5172         struct ctdb_db_context *db;
5173         struct ctdb_transaction_handle *h;
5174         uint8_t db_flags;
5175         TDB_DATA key;
5176         int ret;
5177
5178         if (argc != 2) {
5179                 usage("pdelete");
5180         }
5181
5182         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5183                 return 1;
5184         }
5185
5186         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5187                 fprintf(stderr, "DB %s is not a persistent database\n",
5188                         db_name);
5189                 return 1;
5190         }
5191
5192         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5193                           db_flags, &db);
5194         if (ret != 0) {
5195                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5196                 return ret;
5197         }
5198
5199         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5200         if (ret != 0) {
5201                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5202                 return ret;
5203         }
5204
5205         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5206                                      TIMEOUT(), db, false, &h);
5207         if (ret != 0) {
5208                 fprintf(stderr, "Failed to start transaction on db %s\n",
5209                         db_name);
5210                 return ret;
5211         }
5212
5213         ret = ctdb_transaction_delete_record(h, key);
5214         if (ret != 0) {
5215                 fprintf(stderr, "Failed to delete record for key %s\n",
5216                         argv[1]);
5217                 ctdb_transaction_cancel(h);
5218                 return ret;
5219         }
5220
5221         ret = ctdb_transaction_commit(h);
5222         if (ret != 0) {
5223                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5224                         db_name);
5225                 ctdb_transaction_cancel(h);
5226                 return ret;
5227         }
5228
5229         return 0;
5230 }
5231
5232 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5233 {
5234         const char *t;
5235         size_t n;
5236         int ret;
5237
5238         *data = tdb_null;
5239
5240         /* Skip whitespace */
5241         n = strspn(*ptr, " \t");
5242         t = *ptr + n;
5243
5244         if (t[0] == '"') {
5245                 /* Quoted ASCII string - no wide characters! */
5246                 t++;
5247                 n = strcspn(t, "\"");
5248                 if (t[n] == '"') {
5249                         if (n > 0) {
5250                                 ret = str_to_data(t, n, mem_ctx, data);
5251                                 if (ret != 0) {
5252                                         return ret;
5253                                 }
5254                         }
5255                         *ptr = t + n + 1;
5256                 } else {
5257                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5258                         return 1;
5259                 }
5260         } else {
5261                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5262                 return 1;
5263         }
5264
5265         return 0;
5266 }
5267
5268 #define MAX_LINE_SIZE   1024
5269
5270 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5271                                  TDB_DATA *key, TDB_DATA *value)
5272 {
5273         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5274         const char *ptr;
5275         int ret;
5276
5277         ptr = fgets(line, MAX_LINE_SIZE, file);
5278         if (ptr == NULL) {
5279                 return false;
5280         }
5281
5282         /* Get key */
5283         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5284         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5285                 /* Line Ignored but not EOF */
5286                 *key = tdb_null;
5287                 return true;
5288         }
5289
5290         /* Get value */
5291         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5292         if (ret != 0) {
5293                 /* Line Ignored but not EOF */
5294                 talloc_free(key->dptr);
5295                 *key = tdb_null;
5296                 return true;
5297         }
5298
5299         return true;
5300 }
5301
5302 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5303                           int argc, const char **argv)
5304 {
5305         const char *db_name;
5306         struct ctdb_db_context *db;
5307         struct ctdb_transaction_handle *h;
5308         uint8_t db_flags;
5309         FILE *file;
5310         TDB_DATA key, value;
5311         int ret;
5312
5313         if (argc < 1 || argc > 2) {
5314                 usage("ptrans");
5315         }
5316
5317         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5318                 return 1;
5319         }
5320
5321         if (! (db_flags & CTDB_DB_FLAGS_PERSISTENT)) {
5322                 fprintf(stderr, "DB %s is not a persistent database\n",
5323                         db_name);
5324                 return 1;
5325         }
5326
5327         if (argc == 2) {
5328                 file = fopen(argv[1], "r");
5329                 if (file == NULL) {
5330                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5331                         return 1;
5332                 }
5333         } else {
5334                 file = stdin;
5335         }
5336
5337         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5338                           db_flags, &db);
5339         if (ret != 0) {
5340                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5341                 goto done;
5342         }
5343
5344         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5345                                      TIMEOUT(), db, false, &h);
5346         if (ret != 0) {
5347                 fprintf(stderr, "Failed to start transaction on db %s\n",
5348                         db_name);
5349                 goto done;
5350         }
5351
5352         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5353                 if (key.dsize != 0) {
5354                         ret = ctdb_transaction_store_record(h, key, value);
5355                         if (ret != 0) {
5356                                 fprintf(stderr, "Failed to store record\n");
5357                                 ctdb_transaction_cancel(h);
5358                                 goto done;
5359                         }
5360                         talloc_free(key.dptr);
5361                         talloc_free(value.dptr);
5362                 }
5363         }
5364
5365         ret = ctdb_transaction_commit(h);
5366         if (ret != 0) {
5367                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5368                         db_name);
5369                 ctdb_transaction_cancel(h);
5370         }
5371
5372 done:
5373         if (file != stdin) {
5374                 fclose(file);
5375         }
5376         return ret;
5377 }
5378
5379 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5380                           int argc, const char **argv)
5381 {
5382         struct tdb_context *tdb;
5383         TDB_DATA key, data;
5384         struct ctdb_ltdb_header header;
5385         int ret;
5386
5387         if (argc < 2 || argc > 3) {
5388                 usage("tfetch");
5389         }
5390
5391         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5392         if (tdb == NULL) {
5393                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5394                 return 1;
5395         }
5396
5397         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5398         if (ret != 0) {
5399                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5400                 tdb_close(tdb);
5401                 return ret;
5402         }
5403
5404         data = tdb_fetch(tdb, key);
5405         if (data.dptr == NULL) {
5406                 fprintf(stderr, "No record for key %s\n", argv[1]);
5407                 tdb_close(tdb);
5408                 return 1;
5409         }
5410
5411         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5412                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5413                 tdb_close(tdb);
5414                 return 1;
5415         }
5416
5417         tdb_close(tdb);
5418
5419         if (argc == 3) {
5420                 int fd;
5421                 ssize_t nwritten;
5422
5423                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5424                 if (fd == -1) {
5425                         fprintf(stderr, "Failed to open output file %s\n",
5426                                 argv[2]);
5427                         goto fail;
5428                 }
5429
5430                 nwritten = sys_write(fd, data.dptr, data.dsize);
5431                 if (nwritten != data.dsize) {
5432                         fprintf(stderr, "Failed to write record to file\n");
5433                         close(fd);
5434                         goto fail;
5435                 }
5436
5437                 close(fd);
5438         }
5439
5440 fail:
5441         ret = ctdb_ltdb_header_extract(&data, &header);
5442         if (ret != 0) {
5443                 fprintf(stderr, "Failed to parse header from data\n");
5444                 return 1;
5445         }
5446
5447         dump_ltdb_header(&header);
5448         dump_tdb_data("data", data);
5449
5450         return 0;
5451 }
5452
5453 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5454                           int argc, const char **argv)
5455 {
5456         struct tdb_context *tdb;
5457         TDB_DATA key, data, value;
5458         struct ctdb_ltdb_header header;
5459         size_t offset;
5460         int ret;
5461
5462         if (argc < 3 || argc > 5) {
5463                 usage("tstore");
5464         }
5465
5466         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5467         if (tdb == NULL) {
5468                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5469                 return 1;
5470         }
5471
5472         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5473         if (ret != 0) {
5474                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5475                 tdb_close(tdb);
5476                 return ret;
5477         }
5478
5479         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5480         if (ret != 0) {
5481                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5482                 tdb_close(tdb);
5483                 return ret;
5484         }
5485
5486         ZERO_STRUCT(header);
5487
5488         if (argc > 3) {
5489                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5490         }
5491         if (argc > 4) {
5492                 header.dmaster = (uint32_t)atol(argv[4]);
5493         }
5494         if (argc > 5) {
5495                 header.flags = (uint32_t)atol(argv[5]);
5496         }
5497
5498         offset = ctdb_ltdb_header_len(&header);
5499         data.dsize = offset + value.dsize;
5500         data.dptr = talloc_size(mem_ctx, data.dsize);
5501         if (data.dptr == NULL) {
5502                 fprintf(stderr, "Memory allocation error\n");
5503                 tdb_close(tdb);
5504                 return 1;
5505         }
5506
5507         ctdb_ltdb_header_push(&header, data.dptr);
5508         memcpy(data.dptr + offset, value.dptr, value.dsize);
5509
5510         ret = tdb_store(tdb, key, data, TDB_REPLACE);
5511         if (ret != 0) {
5512                 fprintf(stderr, "Failed to write record %s to file %s\n",
5513                         argv[1], argv[0]);
5514         }
5515
5516         tdb_close(tdb);
5517
5518         return ret;
5519 }
5520
5521 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5522                            int argc, const char **argv)
5523 {
5524         const char *db_name;
5525         struct ctdb_db_context *db;
5526         struct ctdb_record_handle *h;
5527         uint8_t db_flags;
5528         TDB_DATA key, data;
5529         bool readonly = false;
5530         int ret;
5531
5532         if (argc < 2 || argc > 3) {
5533                 usage("readkey");
5534         }
5535
5536         if (argc == 3) {
5537                 if (strcmp(argv[2], "readonly") == 0) {
5538                         readonly = true;
5539                 } else {
5540                         usage("readkey");
5541                 }
5542         }
5543
5544         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5545                 return 1;
5546         }
5547
5548         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5549                 fprintf(stderr, "DB %s is not a volatile database\n",
5550                         db_name);
5551                 return 1;
5552         }
5553
5554         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5555                           db_flags, &db);
5556         if (ret != 0) {
5557                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5558                 return ret;
5559         }
5560
5561         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5562         if (ret != 0) {
5563                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5564                 return ret;
5565         }
5566
5567         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5568                               db, key, readonly, &h, NULL, &data);
5569         if (ret != 0) {
5570                 fprintf(stderr, "Failed to read record for key %s\n",
5571                         argv[1]);
5572         } else {
5573                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5574                        (int)data.dsize, data.dptr);
5575         }
5576
5577         talloc_free(h);
5578         return ret;
5579 }
5580
5581 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5582                             int argc, const char **argv)
5583 {
5584         const char *db_name;
5585         struct ctdb_db_context *db;
5586         struct ctdb_record_handle *h;
5587         uint8_t db_flags;
5588         TDB_DATA key, data;
5589         int ret;
5590
5591         if (argc != 3) {
5592                 usage("writekey");
5593         }
5594
5595         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5596                 return 1;
5597         }
5598
5599         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5600                 fprintf(stderr, "DB %s is not a volatile database\n",
5601                         db_name);
5602                 return 1;
5603         }
5604
5605         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5606                           db_flags, &db);
5607         if (ret != 0) {
5608                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5609                 return ret;
5610         }
5611
5612         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5613         if (ret != 0) {
5614                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5615                 return ret;
5616         }
5617
5618         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5619         if (ret != 0) {
5620                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5621                 return ret;
5622         }
5623
5624         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5625                               db, key, false, &h, NULL, NULL);
5626         if (ret != 0) {
5627                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5628                 return ret;
5629         }
5630
5631         ret = ctdb_store_record(h, data);
5632         if (ret != 0) {
5633                 fprintf(stderr, "Failed to store record for key %s\n",
5634                         argv[1]);
5635         }
5636
5637         talloc_free(h);
5638         return ret;
5639 }
5640
5641 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5642                              int argc, const char **argv)
5643 {
5644         const char *db_name;
5645         struct ctdb_db_context *db;
5646         struct ctdb_record_handle *h;
5647         uint8_t db_flags;
5648         TDB_DATA key, data;
5649         int ret;
5650
5651         if (argc != 2) {
5652                 usage("deletekey");
5653         }
5654
5655         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5656                 return 1;
5657         }
5658
5659         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
5660                 fprintf(stderr, "DB %s is not a volatile database\n",
5661                         db_name);
5662                 return 1;
5663         }
5664
5665         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5666                           db_flags, &db);
5667         if (ret != 0) {
5668                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5669                 return ret;
5670         }
5671
5672         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5673         if (ret != 0) {
5674                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5675                 return ret;
5676         }
5677
5678         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5679                               db, key, false, &h, NULL, &data);
5680         if (ret != 0) {
5681                 fprintf(stderr, "Failed to fetch record for key %s\n",
5682                         argv[1]);
5683                 return ret;
5684         }
5685
5686         ret = ctdb_delete_record(h);
5687         if (ret != 0) {
5688                 fprintf(stderr, "Failed to delete record for key %s\n",
5689                         argv[1]);
5690         }
5691
5692         talloc_free(h);
5693         return ret;
5694 }
5695
5696 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5697                                 int argc, const char **argv)
5698 {
5699         struct sockaddr_in sin;
5700         unsigned int port;
5701         int s, v;
5702         int ret;
5703
5704         if (argc != 1) {
5705                 usage("chktcpport");
5706         }
5707
5708         port = atoi(argv[0]);
5709
5710         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5711         if (s == -1) {
5712                 fprintf(stderr, "Failed to open local socket\n");
5713                 return errno;
5714         }
5715
5716         v = fcntl(s, F_GETFL, 0);
5717         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5718                 fprintf(stderr, "Unable to set socket non-blocking\n");
5719                 close(s);
5720                 return errno;
5721         }
5722
5723         bzero(&sin, sizeof(sin));
5724         sin.sin_family = AF_INET;
5725         sin.sin_port = htons(port);
5726         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5727         close(s);
5728         if (ret == -1) {
5729                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5730                 return errno;
5731         }
5732
5733         return 0;
5734 }
5735
5736 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5737                                int argc, const char **argv)
5738 {
5739         uint32_t db_id;
5740         const char *db_name;
5741         uint64_t seqnum;
5742         int ret;
5743
5744         if (argc != 1) {
5745                 usage("getdbseqnum");
5746         }
5747
5748         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5749                 return 1;
5750         }
5751
5752         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5753                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5754                                       &seqnum);
5755         if (ret != 0) {
5756                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5757                         db_name);
5758                 return ret;
5759         }
5760
5761         printf("0x%"PRIx64"\n", seqnum);
5762         return 0;
5763 }
5764
5765 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5766                               int argc, const char **argv)
5767 {
5768         const char *nodestring = NULL;
5769         struct ctdb_node_map *nodemap;
5770         int ret, i;
5771
5772         if (argc > 1) {
5773                 usage("nodestatus");
5774         }
5775
5776         if (argc == 1) {
5777                 nodestring = argv[0];
5778         }
5779
5780         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5781                 return 1;
5782         }
5783
5784         nodemap = get_nodemap(ctdb, false);
5785         if (nodemap == NULL) {
5786                 return 1;
5787         }
5788
5789         if (options.machinereadable) {
5790                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5791         } else {
5792                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5793         }
5794
5795         ret = 0;
5796         for (i=0; i<nodemap->num; i++) {
5797                 ret |= nodemap->node[i].flags;
5798         }
5799
5800         return ret;
5801 }
5802
5803 const struct {
5804         const char *name;
5805         uint32_t offset;
5806 } db_stats_fields[] = {
5807 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5808         DBSTATISTICS_FIELD(db_ro_delegations),
5809         DBSTATISTICS_FIELD(db_ro_revokes),
5810         DBSTATISTICS_FIELD(locks.num_calls),
5811         DBSTATISTICS_FIELD(locks.num_current),
5812         DBSTATISTICS_FIELD(locks.num_pending),
5813         DBSTATISTICS_FIELD(locks.num_failed),
5814         DBSTATISTICS_FIELD(db_ro_delegations),
5815 };
5816
5817 static void print_dbstatistics(const char *db_name,
5818                                struct ctdb_db_statistics *s)
5819 {
5820         int i;
5821         const char *prefix = NULL;
5822         int preflen = 0;
5823
5824         printf("DB Statistics %s\n", db_name);
5825
5826         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5827                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5828                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5829                         if (! prefix ||
5830                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5831                                 prefix = db_stats_fields[i].name;
5832                                 printf(" %*.*s\n", preflen-1, preflen-1,
5833                                        db_stats_fields[i].name);
5834                         }
5835                 } else {
5836                         preflen = 0;
5837                 }
5838                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5839                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5840                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5841         }
5842
5843         printf(" hop_count_buckets:");
5844         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5845                 printf(" %d", s->hop_count_bucket[i]);
5846         }
5847         printf("\n");
5848
5849         printf(" lock_buckets:");
5850         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5851                 printf(" %d", s->locks.buckets[i]);
5852         }
5853         printf("\n");
5854
5855         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5856                "locks_latency      MIN/AVG/MAX",
5857                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5858                s->locks.latency.max, s->locks.latency.num);
5859
5860         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5861                "vacuum_latency     MIN/AVG/MAX",
5862                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5863                s->vacuum.latency.max, s->vacuum.latency.num);
5864
5865         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5866         for (i=0; i<s->num_hot_keys; i++) {
5867                 int j;
5868                 printf("     Count:%d Key:", s->hot_keys[i].count);
5869                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5870                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5871                 }
5872                 printf("\n");
5873         }
5874 }
5875
5876 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5877                                 int argc, const char **argv)
5878 {
5879         uint32_t db_id;
5880         const char *db_name;
5881         struct ctdb_db_statistics *dbstats;
5882         int ret;
5883
5884         if (argc != 1) {
5885                 usage("dbstatistics");
5886         }
5887
5888         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5889                 return 1;
5890         }
5891
5892         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5893                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5894                                           &dbstats);
5895         if (ret != 0) {
5896                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5897                         db_name);
5898                 return ret;
5899         }
5900
5901         print_dbstatistics(db_name, dbstats);
5902         return 0;
5903 }
5904
5905 struct disable_takeover_runs_state {
5906         uint32_t *pnn_list;
5907         int node_count;
5908         bool *reply;
5909         int status;
5910         bool done;
5911 };
5912
5913 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5914                                          void *private_data)
5915 {
5916         struct disable_takeover_runs_state *state =
5917                 (struct disable_takeover_runs_state *)private_data;
5918         int ret, i;
5919
5920         if (data.dsize != sizeof(int)) {
5921                 /* Ignore packet */
5922                 return;
5923         }
5924
5925         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5926         ret = *(int *)data.dptr;
5927         if (ret < 0) {
5928                 state->status = ret;
5929                 state->done = true;
5930                 return;
5931         }
5932         for (i=0; i<state->node_count; i++) {
5933                 if (state->pnn_list[i] == ret) {
5934                         state->reply[i] = true;
5935                         break;
5936                 }
5937         }
5938
5939         state->done = true;
5940         for (i=0; i<state->node_count; i++) {
5941                 if (! state->reply[i]) {
5942                         state->done = false;
5943                         break;
5944                 }
5945         }
5946 }
5947
5948 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5949                                  struct ctdb_context *ctdb, uint32_t timeout,
5950                                  uint32_t *pnn_list, int count)
5951 {
5952         struct ctdb_disable_message disable = { 0 };
5953         struct disable_takeover_runs_state state;
5954         int ret, i;
5955
5956         disable.pnn = ctdb->pnn;
5957         disable.srvid = next_srvid(ctdb);
5958         disable.timeout = timeout;
5959
5960         state.pnn_list = pnn_list;
5961         state.node_count = count;
5962         state.done = false;
5963         state.status = 0;
5964         state.reply = talloc_zero_array(mem_ctx, bool, count);
5965         if (state.reply == NULL) {
5966                 return ENOMEM;
5967         }
5968
5969         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5970                                               disable.srvid,
5971                                               disable_takeover_run_handler,
5972                                               &state);
5973         if (ret != 0) {
5974                 return ret;
5975         }
5976
5977         for (i=0; i<count; i++) {
5978                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5979                                                          ctdb->client,
5980                                                          pnn_list[i],
5981                                                          &disable);
5982                 if (ret != 0) {
5983                         goto fail;
5984                 }
5985         }
5986
5987         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5988         if (ret == ETIME) {
5989                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5990         } else {
5991                 ret = (state.status >= 0 ? 0 : 1);
5992         }
5993
5994 fail:
5995         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5996                                            disable.srvid, &state);
5997         return ret;
5998 }
5999
6000 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6001                              int argc, const char **argv)
6002 {
6003         const char *nodestring = NULL;
6004         struct ctdb_node_map *nodemap, *nodemap2;
6005         struct ctdb_req_control request;
6006         uint32_t *pnn_list, *pnn_list2;
6007         int ret, count, count2;
6008
6009         if (argc > 1) {
6010                 usage("reloadips");
6011         }
6012
6013         if (argc == 1) {
6014                 nodestring = argv[0];
6015         }
6016
6017         nodemap = get_nodemap(ctdb, false);
6018         if (nodemap == NULL) {
6019                 return 1;
6020         }
6021
6022         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
6023                 return 1;
6024         }
6025
6026         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
6027                                         mem_ctx, &pnn_list);
6028         if (count <= 0) {
6029                 fprintf(stderr, "Memory allocation error\n");
6030                 return 1;
6031         }
6032
6033         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
6034                                       mem_ctx, &pnn_list2);
6035         if (count2 <= 0) {
6036                 fprintf(stderr, "Memory allocation error\n");
6037                 return 1;
6038         }
6039
6040         /* Disable takeover runs on all connected nodes.  A reply
6041          * indicating success is needed from each node so all nodes
6042          * will need to be active.
6043          *
6044          * A check could be added to not allow reloading of IPs when
6045          * there are disconnected nodes.  However, this should
6046          * probably be left up to the administrator.
6047          */
6048         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
6049                                     pnn_list, count);
6050         if (ret != 0) {
6051                 fprintf(stderr, "Failed to disable takeover runs\n");
6052                 return ret;
6053         }
6054
6055         /* Now tell all the desired nodes to reload their public IPs.
6056          * Keep trying this until it succeeds.  This assumes all
6057          * failures are transient, which might not be true...
6058          */
6059         ctdb_req_control_reload_public_ips(&request);
6060         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
6061                                         pnn_list2, count2, TIMEOUT(),
6062                                         &request, NULL, NULL);
6063         if (ret != 0) {
6064                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
6065         }
6066
6067         /* It isn't strictly necessary to wait until takeover runs are
6068          * re-enabled but doing so can't hurt.
6069          */
6070         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
6071         if (ret != 0) {
6072                 fprintf(stderr, "Failed to enable takeover runs\n");
6073                 return ret;
6074         }
6075
6076         return ipreallocate(mem_ctx, ctdb);
6077 }
6078
6079 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
6080                            int argc, const char **argv)
6081 {
6082         ctdb_sock_addr addr;
6083         char *iface;
6084
6085         if (argc != 1) {
6086                 usage("ipiface");
6087         }
6088
6089         if (! parse_ip(argv[0], NULL, 0, &addr)) {
6090                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
6091                 return 1;
6092         }
6093
6094         iface = ctdb_sys_find_ifname(&addr);
6095         if (iface == NULL) {
6096                 fprintf(stderr, "Failed to find interface for IP %s\n",
6097                         argv[0]);
6098                 return 1;
6099         }
6100         free(iface);
6101
6102         return 0;
6103 }
6104
6105
6106 static const struct ctdb_cmd {
6107         const char *name;
6108         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
6109         bool without_daemon; /* can be run without daemon running ? */
6110         bool remote; /* can be run on remote nodes */
6111         const char *msg;
6112         const char *args;
6113 } ctdb_commands[] = {
6114         { "version", control_version, true, false,
6115                 "show version of ctdb", NULL },
6116         { "status", control_status, false, true,
6117                 "show node status", NULL },
6118         { "uptime", control_uptime, false, true,
6119                 "show node uptime", NULL },
6120         { "ping", control_ping, false, true,
6121                 "ping all nodes", NULL },
6122         { "runstate", control_runstate, false, true,
6123                 "get/check runstate of a node",
6124                 "[setup|first_recovery|startup|running]" },
6125         { "getvar", control_getvar, false, true,
6126                 "get a tunable variable", "<name>" },
6127         { "setvar", control_setvar, false, true,
6128                 "set a tunable variable", "<name> <value>" },
6129         { "listvars", control_listvars, false, true,
6130                 "list tunable variables", NULL },
6131         { "statistics", control_statistics, false, true,
6132                 "show ctdb statistics", NULL },
6133         { "statisticsreset", control_statistics_reset, false, true,
6134                 "reset ctdb statistics", NULL },
6135         { "stats", control_stats, false, true,
6136                 "show rolling statistics", "[count]" },
6137         { "ip", control_ip, false, true,
6138                 "show public ips", "[all]" },
6139         { "ipinfo", control_ipinfo, false, true,
6140                 "show public ip details", "<ip>" },
6141         { "ifaces", control_ifaces, false, true,
6142                 "show interfaces", NULL },
6143         { "setifacelink", control_setifacelink, false, true,
6144                 "set interface link status", "<iface> up|down" },
6145         { "process-exists", control_process_exists, false, true,
6146                 "check if a process exists on a node",  "<pid>" },
6147         { "getdbmap", control_getdbmap, false, true,
6148                 "show attached databases", NULL },
6149         { "getdbstatus", control_getdbstatus, false, true,
6150                 "show database status", "<dbname|dbid>" },
6151         { "catdb", control_catdb, false, false,
6152                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
6153         { "cattdb", control_cattdb, false, false,
6154                 "dump local ctdb database", "<dbname|dbid>" },
6155         { "getmonmode", control_getmonmode, false, true,
6156                 "show monitoring mode", NULL },
6157         { "getcapabilities", control_getcapabilities, false, true,
6158                 "show node capabilities", NULL },
6159         { "pnn", control_pnn, false, false,
6160                 "show the pnn of the currnet node", NULL },
6161         { "lvs", control_lvs, false, false,
6162                 "show lvs configuration", "master|list|status" },
6163         { "disablemonitor", control_disable_monitor, false, true,
6164                 "disable monitoring", NULL },
6165         { "enablemonitor", control_enable_monitor, false, true,
6166                 "enable monitoring", NULL },
6167         { "setdebug", control_setdebug, false, true,
6168                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
6169         { "getdebug", control_getdebug, false, true,
6170                 "get debug level", NULL },
6171         { "attach", control_attach, false, false,
6172                 "attach a database", "<dbname> [persistent]" },
6173         { "detach", control_detach, false, false,
6174                 "detach database(s)", "<dbname|dbid> ..." },
6175         { "dumpmemory", control_dumpmemory, false, true,
6176                 "dump ctdbd memory map", NULL },
6177         { "rddumpmemory", control_rddumpmemory, false, true,
6178                 "dump recoverd memory map", NULL },
6179         { "getpid", control_getpid, false, true,
6180                 "get ctdbd process ID", NULL },
6181         { "disable", control_disable, false, true,
6182                 "disable a node", NULL },
6183         { "enable", control_enable, false, true,
6184                 "enable a node", NULL },
6185         { "stop", control_stop, false, true,
6186                 "stop a node", NULL },
6187         { "continue", control_continue, false, true,
6188                 "continue a stopped node", NULL },
6189         { "ban", control_ban, false, true,
6190                 "ban a node", "<bantime>"},
6191         { "unban", control_unban, false, true,
6192                 "unban a node", NULL },
6193         { "shutdown", control_shutdown, false, true,
6194                 "shutdown ctdb daemon", NULL },
6195         { "recover", control_recover, false, true,
6196                 "force recovery", NULL },
6197         { "sync", control_ipreallocate, false, true,
6198                 "run ip reallocation (deprecated)", NULL },
6199         { "ipreallocate", control_ipreallocate, false, true,
6200                 "run ip reallocation", NULL },
6201         { "isnotrecmaster", control_isnotrecmaster, false, false,
6202                 "check if local node is the recmaster", NULL },
6203         { "gratarp", control_gratarp, false, true,
6204                 "send a gratuitous arp", "<ip> <interface>" },
6205         { "tickle", control_tickle, true, false,
6206                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6207         { "gettickles", control_gettickles, false, true,
6208                 "get the list of tickles", "<ip> [<port>]" },
6209         { "addtickle", control_addtickle, false, true,
6210                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6211         { "deltickle", control_deltickle, false, true,
6212                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6213         { "check_srvids", control_check_srvids, false, true,
6214                 "check if srvid is registered", "<id> [<id> ...]" },
6215         { "listnodes", control_listnodes, true, true,
6216                 "list nodes in the cluster", NULL },
6217         { "reloadnodes", control_reloadnodes, false, false,
6218                 "reload the nodes file all nodes", NULL },
6219         { "moveip", control_moveip, false, false,
6220                 "move an ip address to another node", "<ip> <node>" },
6221         { "addip", control_addip, false, true,
6222                 "add an ip address to a node", "<ip/mask> <iface>" },
6223         { "delip", control_delip, false, true,
6224                 "delete an ip address from a node", "<ip>" },
6225         { "eventscript", control_eventscript, false, true,
6226                 "run an event", "monitor" },
6227         { "backupdb", control_backupdb, false, false,
6228                 "backup a database into a file", "<dbname|dbid> <file>" },
6229         { "restoredb", control_restoredb, false, false,
6230                 "restore a database from a file", "<file> [dbname]" },
6231         { "dumpdbbackup", control_dumpdbbackup, true, false,
6232                 "dump database from a backup file", "<file>" },
6233         { "wipedb", control_wipedb, false, false,
6234                 "wipe the contents of a database.", "<dbname|dbid>"},
6235         { "recmaster", control_recmaster, false, true,
6236                 "show the pnn for the recovery master", NULL },
6237         { "scriptstatus", control_scriptstatus, false, true,
6238                 "show event script status",
6239                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6240         { "enablescript", control_enablescript, false, true,
6241                 "enable an eventscript", "<script>"},
6242         { "disablescript", control_disablescript, false, true,
6243                 "disable an eventscript", "<script>"},
6244         { "natgw", control_natgw, false, false,
6245                 "show natgw configuration", "master|list|status" },
6246         { "natgwlist", control_natgwlist, false, false,
6247                 "show the nodes belonging to this natgw configuration", NULL },
6248         { "getreclock", control_getreclock, false, true,
6249                 "get recovery lock file", NULL },
6250         { "setlmasterrole", control_setlmasterrole, false, true,
6251                 "set LMASTER role", "on|off" },
6252         { "setrecmasterrole", control_setrecmasterrole, false, true,
6253                 "set RECMASTER role", "on|off"},
6254         { "setdbreadonly", control_setdbreadonly, false, true,
6255                 "enable readonly records", "<dbname|dbid>" },
6256         { "setdbsticky", control_setdbsticky, false, true,
6257                 "enable sticky records", "<dbname|dbid>"},
6258         { "pfetch", control_pfetch, false, false,
6259                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6260         { "pstore", control_pstore, false, false,
6261                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6262         { "pdelete", control_pdelete, false, false,
6263                 "delete record from persistent database", "<dbname|dbid> <key>" },
6264         { "ptrans", control_ptrans, false, false,
6265                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6266         { "tfetch", control_tfetch, false, true,
6267                 "fetch a record", "<tdb-file> <key> [<file>]" },
6268         { "tstore", control_tstore, false, true,
6269                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6270         { "readkey", control_readkey, false, false,
6271                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6272         { "writekey", control_writekey, false, false,
6273                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6274         { "deletekey", control_deletekey, false, false,
6275                 "delete a database key", "<dbname|dbid> <key>" },
6276         { "checktcpport", control_checktcpport, true, false,
6277                 "check if a service is bound to a specific tcp port or not", "<port>" },
6278         { "getdbseqnum", control_getdbseqnum, false, false,
6279                 "get database sequence number", "<dbname|dbid>" },
6280         { "nodestatus", control_nodestatus, false, true,
6281                 "show and return node status", "[all|<pnn-list>]" },
6282         { "dbstatistics", control_dbstatistics, false, true,
6283                 "show database statistics", "<dbname|dbid>" },
6284         { "reloadips", control_reloadips, false, false,
6285                 "reload the public addresses file", "[all|<pnn-list>]" },
6286         { "ipiface", control_ipiface, true, false,
6287                 "Find the interface an ip address is hosted on", "<ip>" },
6288 };
6289
6290 static const struct ctdb_cmd *match_command(const char *command)
6291 {
6292         const struct ctdb_cmd *cmd;
6293         int i;
6294
6295         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6296                 cmd = &ctdb_commands[i];
6297                 if (strlen(command) == strlen(cmd->name) &&
6298                     strncmp(command, cmd->name, strlen(command)) == 0) {
6299                         return cmd;
6300                 }
6301         }
6302
6303         return NULL;
6304 }
6305
6306
6307 /**
6308  * Show usage message
6309  */
6310 static void usage_full(void)
6311 {
6312         int i;
6313
6314         poptPrintHelp(pc, stdout, 0);
6315         printf("\nCommands:\n");
6316         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6317                 printf("  %-15s %-27s  %s\n",
6318                        ctdb_commands[i].name,
6319                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6320                        ctdb_commands[i].msg);
6321         }
6322 }
6323
6324 static void usage(const char *command)
6325 {
6326         const struct ctdb_cmd *cmd;
6327
6328         if (command == NULL) {
6329                 usage_full();
6330                 exit(1);
6331         }
6332
6333         cmd = match_command(command);
6334         if (cmd == NULL) {
6335                 usage_full();
6336         } else {
6337                 poptPrintUsage(pc, stdout, 0);
6338                 printf("\nCommands:\n");
6339                 printf("  %-15s %-27s  %s\n",
6340                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6341         }
6342
6343         exit(1);
6344 }
6345
6346 struct poptOption cmdline_options[] = {
6347         POPT_AUTOHELP
6348         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6349                 "CTDB socket path", "filename" },
6350         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6351                 "debug level"},
6352         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6353                 "timelimit (in seconds)" },
6354         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6355                 "node specification - integer" },
6356         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6357                 "enable machine readable output", NULL },
6358         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6359                 "specify separator for machine readable output", "CHAR" },
6360         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6361                 "enable machine parsable output with separator |", NULL },
6362         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6363                 "enable verbose output", NULL },
6364         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6365                 "die if runtime exceeds this limit (in seconds)" },
6366         POPT_TABLEEND
6367 };
6368
6369 static int process_command(const struct ctdb_cmd *cmd, int argc,
6370                            const char **argv)
6371 {
6372         TALLOC_CTX *tmp_ctx;
6373         struct ctdb_context *ctdb;
6374         int ret;
6375         bool status;
6376         uint64_t srvid_offset;
6377
6378         tmp_ctx = talloc_new(NULL);
6379         if (tmp_ctx == NULL) {
6380                 fprintf(stderr, "Memory allocation error\n");
6381                 goto fail;
6382         }
6383
6384         if (cmd->without_daemon) {
6385                 if (options.pnn != -1) {
6386                         fprintf(stderr,
6387                                 "Cannot specify node for command %s\n",
6388                                 cmd->name);
6389                         goto fail;
6390                 }
6391
6392                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6393                 talloc_free(tmp_ctx);
6394                 return ret;
6395         }
6396
6397         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6398         if (ctdb == NULL) {
6399                 fprintf(stderr, "Memory allocation error\n");
6400                 goto fail;
6401         }
6402
6403         ctdb->ev = tevent_context_init(ctdb);
6404         if (ctdb->ev == NULL) {
6405                 fprintf(stderr, "Failed to initialize tevent\n");
6406                 goto fail;
6407         }
6408
6409         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6410         if (ret != 0) {
6411                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6412                         options.socket);
6413
6414                 if (!find_node_xpnn(ctdb, NULL)) {
6415                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6416                 }
6417                 goto fail;
6418         }
6419
6420         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6421         srvid_offset = getpid() & 0xFFFF;
6422         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6423
6424         if (options.pnn != -1) {
6425                 status = verify_pnn(ctdb, options.pnn);
6426                 if (! status) {
6427                         goto fail;
6428                 }
6429
6430                 ctdb->cmd_pnn = options.pnn;
6431         } else {
6432                 ctdb->cmd_pnn = ctdb->pnn;
6433         }
6434
6435         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6436                 fprintf(stderr, "Node cannot be specified for command %s\n",
6437                         cmd->name);
6438                 goto fail;
6439         }
6440
6441         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6442         talloc_free(tmp_ctx);
6443         return ret;
6444
6445 fail:
6446         talloc_free(tmp_ctx);
6447         return 1;
6448 }
6449
6450 static void signal_handler(int sig)
6451 {
6452         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6453 }
6454
6455 static void alarm_handler(int sig)
6456 {
6457         /* Kill any child processes */
6458         signal(SIGTERM, signal_handler);
6459         kill(0, SIGTERM);
6460
6461         _exit(1);
6462 }
6463
6464 int main(int argc, const char *argv[])
6465 {
6466         int opt;
6467         const char **extra_argv;
6468         int extra_argc;
6469         const struct ctdb_cmd *cmd;
6470         const char *ctdb_socket;
6471         enum debug_level loglevel;
6472         int ret;
6473
6474         setlinebuf(stdout);
6475
6476         /* Set default options */
6477         options.socket = CTDB_SOCKET;
6478         options.debuglevelstr = NULL;
6479         options.timelimit = 10;
6480         options.sep = "|";
6481         options.maxruntime = 0;
6482         options.pnn = -1;
6483
6484         ctdb_socket = getenv("CTDB_SOCKET");
6485         if (ctdb_socket != NULL) {
6486                 options.socket = ctdb_socket;
6487         }
6488
6489         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6490                             POPT_CONTEXT_KEEP_FIRST);
6491         while ((opt = poptGetNextOpt(pc)) != -1) {
6492                 fprintf(stderr, "Invalid option %s: %s\n",
6493                         poptBadOption(pc, 0), poptStrerror(opt));
6494                 exit(1);
6495         }
6496
6497         if (options.maxruntime == 0) {
6498                 const char *ctdb_timeout;
6499
6500                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6501                 if (ctdb_timeout != NULL) {
6502                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6503                 } else {
6504                         options.maxruntime = 120;
6505                 }
6506         }
6507         if (options.maxruntime <= 120) {
6508                 /* default timeout is 120 seconds */
6509                 options.maxruntime = 120;
6510         }
6511
6512         if (options.machineparsable) {
6513                 options.machinereadable = 1;
6514         }
6515
6516         /* setup the remaining options for the commands */
6517         extra_argc = 0;
6518         extra_argv = poptGetArgs(pc);
6519         if (extra_argv) {
6520                 extra_argv++;
6521                 while (extra_argv[extra_argc]) extra_argc++;
6522         }
6523
6524         if (extra_argc < 1) {
6525                 usage(NULL);
6526         }
6527
6528         cmd = match_command(extra_argv[0]);
6529         if (cmd == NULL) {
6530                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6531                 exit(1);
6532         }
6533
6534         /* Enable logging */
6535         setup_logging("ctdb", DEBUG_STDERR);
6536         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6537                 DEBUGLEVEL = loglevel;
6538         } else {
6539                 DEBUGLEVEL = DEBUG_ERR;
6540         }
6541
6542         signal(SIGALRM, alarm_handler);
6543         alarm(options.maxruntime);
6544
6545         ret = process_command(cmd, extra_argc, extra_argv);
6546         if (ret == -1) {
6547                 ret = 1;
6548         }
6549
6550         (void)poptFreeContext(pc);
6551
6552         return ret;
6553 }