af341951eec3e097b26b82fe094a848e7541d404
[bbaumbach/samba-autobuild/.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25 #include "system/dir.h"
26
27 #include <ctype.h>
28 #include <popt.h>
29 #include <talloc.h>
30 #include <tevent.h>
31 #include <tdb.h>
32
33 #include "common/version.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/sys_rw.h"
37
38 #include "common/db_hash.h"
39 #include "common/logging.h"
40 #include "protocol/protocol.h"
41 #include "protocol/protocol_api.h"
42 #include "protocol/protocol_util.h"
43 #include "common/system.h"
44 #include "client/client.h"
45 #include "client/client_sync.h"
46
47 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
48
49 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
50 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
51
52 static struct {
53         const char *socket;
54         const char *debuglevelstr;
55         int timelimit;
56         int pnn;
57         int machinereadable;
58         const char *sep;
59         int machineparsable;
60         int verbose;
61         int maxruntime;
62         int printemptyrecords;
63         int printdatasize;
64         int printlmaster;
65         int printhash;
66         int printrecordflags;
67 } options;
68
69 static poptContext pc;
70
71 struct ctdb_context {
72         struct tevent_context *ev;
73         struct ctdb_client_context *client;
74         struct ctdb_node_map *nodemap;
75         uint32_t pnn, cmd_pnn;
76         uint64_t srvid;
77 };
78
79 static void usage(const char *command);
80
81 /*
82  * Utility Functions
83  */
84
85 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
86 {
87         return (tv2->tv_sec - tv->tv_sec) +
88                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
89 }
90
91 static struct ctdb_node_and_flags *get_node_by_pnn(
92                                         struct ctdb_node_map *nodemap,
93                                         uint32_t pnn)
94 {
95         int i;
96
97         for (i=0; i<nodemap->num; i++) {
98                 if (nodemap->node[i].pnn == pnn) {
99                         return &nodemap->node[i];
100                 }
101         }
102         return NULL;
103 }
104
105 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
106 {
107         static const struct {
108                 uint32_t flag;
109                 const char *name;
110         } flag_names[] = {
111                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
112                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
113                 { NODE_FLAGS_BANNED,                "BANNED" },
114                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
115                 { NODE_FLAGS_DELETED,               "DELETED" },
116                 { NODE_FLAGS_STOPPED,               "STOPPED" },
117                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
118         };
119         char *flags_str = NULL;
120         int i;
121
122         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
123                 if (flags & flag_names[i].flag) {
124                         if (flags_str == NULL) {
125                                 flags_str = talloc_asprintf(mem_ctx,
126                                                 "%s", flag_names[i].name);
127                         } else {
128                                 flags_str = talloc_asprintf_append(flags_str,
129                                                 "|%s", flag_names[i].name);
130                         }
131                         if (flags_str == NULL) {
132                                 return "OUT-OF-MEMORY";
133                         }
134                 }
135         }
136         if (flags_str == NULL) {
137                 return "OK";
138         }
139
140         return flags_str;
141 }
142
143 static uint64_t next_srvid(struct ctdb_context *ctdb)
144 {
145         ctdb->srvid += 1;
146         return ctdb->srvid;
147 }
148
149 /*
150  * Get consistent nodemap information.
151  *
152  * If nodemap is already cached, use that. If not get it.
153  * If the current node is BANNED, then get nodemap from "better" node.
154  */
155 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
156 {
157         TALLOC_CTX *tmp_ctx;
158         struct ctdb_node_map *nodemap;
159         struct ctdb_node_and_flags *node;
160         uint32_t current_node;
161         int ret;
162
163         if (force) {
164                 TALLOC_FREE(ctdb->nodemap);
165         }
166
167         if (ctdb->nodemap != NULL) {
168                 return ctdb->nodemap;
169         }
170
171         tmp_ctx = talloc_new(ctdb);
172         if (tmp_ctx == NULL) {
173                 return false;
174         }
175
176         current_node = ctdb->pnn;
177 again:
178         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
179                                     current_node, TIMEOUT(), &nodemap);
180         if (ret != 0) {
181                 fprintf(stderr, "Failed to get nodemap from node %u\n",
182                         current_node);
183                 goto failed;
184         }
185
186         node = get_node_by_pnn(nodemap, current_node);
187         if (node->flags & NODE_FLAGS_BANNED) {
188                 /* Pick next node */
189                 do {
190                         current_node = (current_node + 1) % nodemap->num;
191                         node = get_node_by_pnn(nodemap, current_node);
192                         if (! (node->flags &
193                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
194                                 break;
195                         }
196                 } while (current_node != ctdb->pnn);
197
198                 if (current_node == ctdb->pnn) {
199                         /* Tried all nodes in the cluster */
200                         fprintf(stderr, "Warning: All nodes are banned.\n");
201                         goto failed;
202                 }
203
204                 goto again;
205         }
206
207         ctdb->nodemap = talloc_steal(ctdb, nodemap);
208         return nodemap;
209
210 failed:
211         talloc_free(tmp_ctx);
212         return NULL;
213 }
214
215 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
216 {
217         struct ctdb_node_map *nodemap;
218         bool found;
219         int i;
220
221         if (pnn == -1) {
222                 return false;
223         }
224
225         nodemap = get_nodemap(ctdb, false);
226         if (nodemap == NULL) {
227                 return false;
228         }
229
230         found = false;
231         for (i=0; i<nodemap->num; i++) {
232                 if (nodemap->node[i].pnn == pnn) {
233                         found = true;
234                         break;
235                 }
236         }
237         if (! found) {
238                 fprintf(stderr, "Node %u does not exist\n", pnn);
239                 return false;
240         }
241
242         if (nodemap->node[i].flags &
243             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
244                 fprintf(stderr, "Node %u has status %s\n", pnn,
245                         pretty_print_flags(ctdb, nodemap->node[i].flags));
246                 return false;
247         }
248
249         return true;
250 }
251
252 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
253                                             struct ctdb_node_map *nodemap)
254 {
255         struct ctdb_node_map *nodemap2;
256
257         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
258         if (nodemap2 == NULL) {
259                 return NULL;
260         }
261
262         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
263                                       nodemap->num);
264         if (nodemap2->node == NULL) {
265                 talloc_free(nodemap2);
266                 return NULL;
267         }
268
269         return nodemap2;
270 }
271
272 /*
273  * Get the number and the list of matching nodes
274  *
275  *   nodestring :=  NULL | all | pnn,[pnn,...]
276  *
277  * If nodestring is NULL, use the current node.
278  */
279 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
280                              const char *nodestring,
281                              struct ctdb_node_map **out)
282 {
283         struct ctdb_node_map *nodemap, *nodemap2;
284         struct ctdb_node_and_flags *node;
285         int i;
286
287         nodemap = get_nodemap(ctdb, false);
288         if (nodemap == NULL) {
289                 return false;
290         }
291
292         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
293         if (nodemap2 == NULL) {
294                 return false;
295         }
296
297         if (nodestring == NULL) {
298                 for (i=0; i<nodemap->num; i++) {
299                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
300                                 nodemap2->node[0] = nodemap->node[i];
301                                 break;
302                         }
303                 }
304                 nodemap2->num = 1;
305
306                 goto done;
307         }
308
309         if (strcmp(nodestring, "all") == 0) {
310                 for (i=0; i<nodemap->num; i++) {
311                         nodemap2->node[i] = nodemap->node[i];
312                 }
313                 nodemap2->num = nodemap->num;
314
315                 goto done;
316         } else {
317                 char *ns, *tok;
318
319                 ns = talloc_strdup(mem_ctx, nodestring);
320                 if (ns == NULL) {
321                         return false;
322                 }
323
324                 tok = strtok(ns, ",");
325                 while (tok != NULL) {
326                         uint32_t pnn;
327                         char *endptr;
328
329                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
330                         if (pnn == 0 && tok == endptr) {
331                                 fprintf(stderr, "Invalid node %s\n", tok);
332                                         return false;
333                         }
334
335                         node = get_node_by_pnn(nodemap, pnn);
336                         if (node == NULL) {
337                                 fprintf(stderr, "Node %u does not exist\n",
338                                         pnn);
339                                 return false;
340                         }
341
342                         nodemap2->node[nodemap2->num] = *node;
343                         nodemap2->num += 1;
344
345                         tok = strtok(NULL, ",");
346                 }
347         }
348
349 done:
350         *out = nodemap2;
351         return true;
352 }
353
354 /* Compare IP address */
355 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
356 {
357         bool ret = false;
358
359         if (ip1->sa.sa_family != ip2->sa.sa_family) {
360                 return false;
361         }
362
363         switch (ip1->sa.sa_family) {
364         case AF_INET:
365                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
366                               sizeof(struct in_addr)) == 0);
367                 break;
368
369         case AF_INET6:
370                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
371                               sizeof(struct in6_addr)) == 0);
372                 break;
373         }
374
375         return ret;
376 }
377
378 /* Append a node to a node map with given address and flags */
379 static bool node_map_add(struct ctdb_node_map *nodemap,
380                          const char *nstr, uint32_t flags)
381 {
382         ctdb_sock_addr addr;
383         uint32_t num;
384         struct ctdb_node_and_flags *n;
385
386         if (! parse_ip(nstr, NULL, 0, &addr)) {
387                 fprintf(stderr, "Invalid IP address %s\n", nstr);
388                 return false;
389         }
390
391         num = nodemap->num;
392         nodemap->node = talloc_realloc(nodemap, nodemap->node,
393                                        struct ctdb_node_and_flags, num+1);
394         if (nodemap->node == NULL) {
395                 return false;
396         }
397
398         n = &nodemap->node[num];
399         n->addr = addr;
400         n->pnn = num;
401         n->flags = flags;
402
403         nodemap->num = num+1;
404         return true;
405 }
406
407 /* Read a nodes file into a node map */
408 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
409                                                   const char *nlist)
410 {
411         char **lines;
412         int nlines;
413         int i;
414         struct ctdb_node_map *nodemap;
415
416         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
417         if (nodemap == NULL) {
418                 return NULL;
419         }
420
421         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
422         if (lines == NULL) {
423                 return NULL;
424         }
425
426         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
427                 nlines--;
428         }
429
430         for (i=0; i<nlines; i++) {
431                 char *node;
432                 uint32_t flags;
433                 size_t len;
434
435                 node = lines[i];
436                 /* strip leading spaces */
437                 while((*node == ' ') || (*node == '\t')) {
438                         node++;
439                 }
440
441                 len = strlen(node);
442
443                 /* strip trailing spaces */
444                 while ((len > 1) &&
445                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
446                 {
447                         node[len-1] = '\0';
448                         len--;
449                 }
450
451                 if (len == 0) {
452                         continue;
453                 }
454                 if (*node == '#') {
455                         /* A "deleted" node is a node that is
456                            commented out in the nodes file.  This is
457                            used instead of removing a line, which
458                            would cause subsequent nodes to change
459                            their PNN. */
460                         flags = NODE_FLAGS_DELETED;
461                         node = discard_const("0.0.0.0");
462                 } else {
463                         flags = 0;
464                 }
465                 if (! node_map_add(nodemap, node, flags)) {
466                         talloc_free(lines);
467                         TALLOC_FREE(nodemap);
468                         return NULL;
469                 }
470         }
471
472         talloc_free(lines);
473         return nodemap;
474 }
475
476 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
477 {
478         struct ctdb_node_map *nodemap;
479         char *nodepath;
480         const char *nodes_list = NULL;
481
482         if (pnn != CTDB_UNKNOWN_PNN) {
483                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
484                 if (nodepath != NULL) {
485                         nodes_list = getenv(nodepath);
486                 }
487         }
488         if (nodes_list == NULL) {
489                 nodes_list = getenv("CTDB_NODES");
490         }
491         if (nodes_list == NULL) {
492                 const char *basedir = getenv("CTDB_BASE");
493                 if (basedir == NULL) {
494                         basedir = CTDB_ETCDIR;
495                 }
496                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
497                 if (nodes_list == NULL) {
498                         fprintf(stderr, "Memory allocation error\n");
499                         return NULL;
500                 }
501         }
502
503         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
504         if (nodemap == NULL) {
505                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
506                         nodes_list);
507                 return NULL;
508         }
509
510         return nodemap;
511 }
512
513 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
514                                  struct ctdb_context *ctdb,
515                                  struct ctdb_dbid_map *dbmap,
516                                  const char *db_name)
517 {
518         struct ctdb_dbid *db = NULL;
519         const char *name;
520         int ret, i;
521
522         for (i=0; i<dbmap->num; i++) {
523                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
524                                            ctdb->pnn, TIMEOUT(),
525                                            dbmap->dbs[i].db_id, &name);
526                 if (ret != 0) {
527                         return false;
528                 }
529
530                 if (strcmp(db_name, name) == 0) {
531                         talloc_free(discard_const(name));
532                         db = &dbmap->dbs[i];
533                         break;
534                 }
535         }
536
537         return db;
538 }
539
540 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
541                       const char *db_arg, uint32_t *db_id,
542                       const char **db_name, uint8_t *db_flags)
543 {
544         struct ctdb_dbid_map *dbmap;
545         struct ctdb_dbid *db = NULL;
546         uint32_t id = 0;
547         const char *name = NULL;
548         int ret, i;
549
550         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
551                                   ctdb->pnn, TIMEOUT(), &dbmap);
552         if (ret != 0) {
553                 return false;
554         }
555
556         if (strncmp(db_arg, "0x", 2) == 0) {
557                 id = strtoul(db_arg, NULL, 0);
558                 for (i=0; i<dbmap->num; i++) {
559                         if (id == dbmap->dbs[i].db_id) {
560                                 db = &dbmap->dbs[i];
561                                 break;
562                         }
563                 }
564         } else {
565                 name = db_arg;
566                 db = db_find(mem_ctx, ctdb, dbmap, name);
567         }
568
569         if (db == NULL) {
570                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
571                 return false;
572         }
573
574         if (name == NULL) {
575                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
576                                            ctdb->pnn, TIMEOUT(), id, &name);
577                 if (ret != 0) {
578                         return false;
579                 }
580         }
581
582         if (db_id != NULL) {
583                 *db_id = db->db_id;
584         }
585         if (db_name != NULL) {
586                 *db_name = talloc_strdup(mem_ctx, name);
587         }
588         if (db_flags != NULL) {
589                 *db_flags = db->flags;
590         }
591         return true;
592 }
593
594 static int h2i(char h)
595 {
596         if (h >= 'a' && h <= 'f') {
597                 return h - 'a' + 10;
598         }
599         if (h >= 'A' && h <= 'F') {
600                 return h - 'f' + 10;
601         }
602         return h - '0';
603 }
604
605 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
606                        TDB_DATA *out)
607 {
608         int i;
609         TDB_DATA data;
610
611         if (len & 0x01) {
612                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
613                         str);
614                 return EINVAL;
615         }
616
617         data.dsize = len / 2;
618         data.dptr = talloc_size(mem_ctx, data.dsize);
619         if (data.dptr == NULL) {
620                 return ENOMEM;
621         }
622
623         for (i=0; i<data.dsize; i++) {
624                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
625         }
626
627         *out = data;
628         return 0;
629 }
630
631 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
632                        TDB_DATA *out)
633 {
634         TDB_DATA data;
635         int ret = 0;
636
637         if (strncmp(str, "0x", 2) == 0) {
638                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
639         } else {
640                 data.dptr = talloc_memdup(mem_ctx, str, len);
641                 if (data.dptr == NULL) {
642                         return ENOMEM;
643                 }
644                 data.dsize = len;
645         }
646
647         *out = data;
648         return ret;
649 }
650
651 static int run_helper(TALLOC_CTX *mem_ctx, const char *command,
652                       const char *path, int argc, const char **argv)
653 {
654         pid_t pid;
655         int save_errno, status, ret;
656         const char **new_argv;
657         int i;
658
659         new_argv = talloc_array(mem_ctx, const char *, argc + 2);
660         if (new_argv == NULL) {
661                 return ENOMEM;
662         }
663
664         new_argv[0] = path;
665         for (i=0; i<argc; i++) {
666                 new_argv[i+1] = argv[i];
667         }
668         new_argv[argc+1] = NULL;
669
670         pid = fork();
671         if (pid < 0) {
672                 save_errno = errno;
673                 talloc_free(new_argv);
674                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
675                         command, path, strerror(save_errno));
676                 return save_errno;
677         }
678
679         if (pid == 0) {
680                 ret = execv(path, discard_const(new_argv));
681                 if (ret == -1) {
682                         _exit(64+errno);
683                 }
684                 /* Should not happen */
685                 _exit(64+ENOEXEC);
686         }
687
688         talloc_free(new_argv);
689
690         ret = waitpid(pid, &status, 0);
691         if (ret == -1) {
692                 save_errno = errno;
693                 fprintf(stderr, "waitpid() failed for %s - %s\n",
694                         command, strerror(save_errno));
695                 return save_errno;
696         }
697
698         if (WIFEXITED(status)) {
699                 int pstatus = WEXITSTATUS(status);
700                 if (WIFSIGNALED(status)) {
701                         fprintf(stderr, "%s terminated with signal %d\n",
702                                 command, WTERMSIG(status));
703                         ret = EINTR;
704                 } else if (pstatus >= 64 && pstatus < 255) {
705                         fprintf(stderr, "%s failed with error %d\n",
706                                 command, pstatus-64);
707                         ret = pstatus - 64;
708                 } else {
709                         ret = pstatus;
710                 }
711                 return ret;
712         } else if (WIFSIGNALED(status)) {
713                 fprintf(stderr, "%s terminated with signal %d\n",
714                         command, WTERMSIG(status));
715                 return EINTR;
716         }
717
718         return 0;
719 }
720
721 /*
722  * Command Functions
723  */
724
725 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
726                            int argc, const char **argv)
727 {
728         printf("%s\n", ctdb_version_string);
729         return 0;
730 }
731
732 static bool partially_online(TALLOC_CTX *mem_ctx,
733                              struct ctdb_context *ctdb,
734                              struct ctdb_node_and_flags *node)
735 {
736         struct ctdb_iface_list *iface_list;
737         int ret, i;
738         bool status = false;
739
740         if (node->flags != 0) {
741                 return false;
742         }
743
744         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
745                                    node->pnn, TIMEOUT(), &iface_list);
746         if (ret != 0) {
747                 return false;
748         }
749
750         status = false;
751         for (i=0; i < iface_list->num; i++) {
752                 if (iface_list->iface[i].link_state == 0) {
753                         status = true;
754                         break;
755                 }
756         }
757
758         return status;
759 }
760
761 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
762                                   struct ctdb_context *ctdb,
763                                   struct ctdb_node_map *nodemap,
764                                   uint32_t mypnn)
765 {
766         struct ctdb_node_and_flags *node;
767         int i;
768
769         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
770                options.sep,
771                "Node", options.sep,
772                "IP", options.sep,
773                "Disconnected", options.sep,
774                "Banned", options.sep,
775                "Disabled", options.sep,
776                "Unhealthy", options.sep,
777                "Stopped", options.sep,
778                "Inactive", options.sep,
779                "PartiallyOnline", options.sep,
780                "ThisNode", options.sep);
781
782         for (i=0; i<nodemap->num; i++) {
783                 node = &nodemap->node[i];
784                 if (node->flags & NODE_FLAGS_DELETED) {
785                         continue;
786                 }
787
788                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
789                        options.sep,
790                        node->pnn, options.sep,
791                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
792                        options.sep,
793                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
794                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
795                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
796                        options.sep,
797                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
798                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
799                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
800                        partially_online(mem_ctx, ctdb, node), options.sep,
801                        (node->pnn == mypnn)?'Y':'N', options.sep);
802         }
803
804 }
805
806 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
807                           struct ctdb_node_map *nodemap, uint32_t mypnn,
808                           bool print_header)
809 {
810         struct ctdb_node_and_flags *node;
811         int num_deleted_nodes = 0;
812         int i;
813
814         for (i=0; i<nodemap->num; i++) {
815                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
816                         num_deleted_nodes++;
817                 }
818         }
819
820         if (print_header) {
821                 if (num_deleted_nodes == 0) {
822                         printf("Number of nodes:%d\n", nodemap->num);
823                 } else {
824                         printf("Number of nodes:%d "
825                                "(including %d deleted nodes)\n",
826                                nodemap->num, num_deleted_nodes);
827                 }
828         }
829
830         for (i=0; i<nodemap->num; i++) {
831                 node = &nodemap->node[i];
832                 if (node->flags & NODE_FLAGS_DELETED) {
833                         continue;
834                 }
835
836                 printf("pnn:%u %-16s %s%s\n",
837                        node->pnn,
838                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
839                        partially_online(mem_ctx, ctdb, node) ?
840                                 "PARTIALLYONLINE" :
841                                 pretty_print_flags(mem_ctx, node->flags),
842                        node->pnn == mypnn ? " (THIS NODE)" : "");
843         }
844 }
845
846 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
847                          struct ctdb_node_map *nodemap, uint32_t mypnn,
848                          struct ctdb_vnn_map *vnnmap, int recmode,
849                          uint32_t recmaster)
850 {
851         int i;
852
853         print_nodemap(mem_ctx, ctdb, nodemap, mypnn, true);
854
855         if (vnnmap->generation == INVALID_GENERATION) {
856                 printf("Generation:INVALID\n");
857         } else {
858                 printf("Generation:%u\n", vnnmap->generation);
859         }
860         printf("Size:%d\n", vnnmap->size);
861         for (i=0; i<vnnmap->size; i++) {
862                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
863         }
864
865         printf("Recovery mode:%s (%d)\n",
866                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
867                recmode);
868         printf("Recovery master:%d\n", recmaster);
869 }
870
871 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
872                           int argc, const char **argv)
873 {
874         struct ctdb_node_map *nodemap;
875         struct ctdb_vnn_map *vnnmap;
876         int recmode;
877         uint32_t recmaster;
878         int ret;
879
880         if (argc != 0) {
881                 usage("status");
882         }
883
884         nodemap = get_nodemap(ctdb, false);
885         if (nodemap == NULL) {
886                 return 1;
887         }
888
889         if (options.machinereadable == 1) {
890                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
891                 return 0;
892         }
893
894         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
895                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
896         if (ret != 0) {
897                 return ret;
898         }
899
900         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
901                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
902         if (ret != 0) {
903                 return ret;
904         }
905
906         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
907                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
908         if (ret != 0) {
909                 return ret;
910         }
911
912         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
913                      recmode, recmaster);
914         return 0;
915 }
916
917 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
918                           int argc, const char **argv)
919 {
920         struct ctdb_uptime *uptime;
921         int ret, tmp, days, hours, minutes, seconds;
922
923         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
924                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
925         if (ret != 0) {
926                 return ret;
927         }
928
929         printf("Current time of node %-4u     :                %s",
930                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
931
932         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
933         seconds = tmp % 60; tmp /= 60;
934         minutes = tmp % 60; tmp /= 60;
935         hours = tmp % 24; tmp /= 24;
936         days = tmp;
937
938         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
939                days, hours, minutes, seconds,
940                ctime(&uptime->ctdbd_start_time.tv_sec));
941
942         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
943         seconds = tmp % 60; tmp /= 60;
944         minutes = tmp % 60; tmp /= 60;
945         hours = tmp % 24; tmp /= 24;
946         days = tmp;
947
948         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
949                days, hours, minutes, seconds,
950                ctime(&uptime->last_recovery_finished.tv_sec));
951
952         printf("Duration of last recovery/failover: %lf seconds\n",
953                timeval_delta(&uptime->last_recovery_finished,
954                              &uptime->last_recovery_started));
955
956         return 0;
957 }
958
959 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
960                         int argc, const char **argv)
961 {
962         struct timeval tv;
963         int ret, num_clients;
964
965         tv = timeval_current();
966         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
967                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
968         if (ret != 0) {
969                 return ret;
970         }
971
972         printf("response from %u time=%.6f sec  (%d clients)\n",
973                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
974         return 0;
975 }
976
977 const char *runstate_to_string(enum ctdb_runstate runstate);
978 enum ctdb_runstate runstate_from_string(const char *runstate_str);
979
980 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
981                             int argc, const char **argv)
982 {
983         enum ctdb_runstate runstate;
984         bool found;
985         int ret, i;
986
987         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
988                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
989         if (ret != 0) {
990                 return ret;
991         }
992
993         found = true;
994         for (i=0; i<argc; i++) {
995                 enum ctdb_runstate t;
996
997                 found = false;
998                 t = ctdb_runstate_from_string(argv[i]);
999                 if (t == CTDB_RUNSTATE_UNKNOWN) {
1000                         printf("Invalid run state (%s)\n", argv[i]);
1001                         return 1;
1002                 }
1003
1004                 if (t == runstate) {
1005                         found = true;
1006                         break;
1007                 }
1008         }
1009
1010         if (! found) {
1011                 printf("CTDB not in required run state (got %s)\n",
1012                        ctdb_runstate_to_string(runstate));
1013                 return 1;
1014         }
1015
1016         printf("%s\n", ctdb_runstate_to_string(runstate));
1017         return 0;
1018 }
1019
1020 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1021                           int argc, const char **argv)
1022 {
1023         struct ctdb_var_list *tun_var_list;
1024         uint32_t value;
1025         int ret, i;
1026         bool found;
1027
1028         if (argc != 1) {
1029                 usage("getvar");
1030         }
1031
1032         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1033                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1034         if (ret != 0) {
1035                 fprintf(stderr,
1036                         "Failed to get list of variables from node %u\n",
1037                         ctdb->cmd_pnn);
1038                 return ret;
1039         }
1040
1041         found = false;
1042         for (i=0; i<tun_var_list->count; i++) {
1043                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1044                         found = true;
1045                         break;
1046                 }
1047         }
1048
1049         if (! found) {
1050                 printf("No such tunable %s\n", argv[0]);
1051                 return 1;
1052         }
1053
1054         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1055                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1056         if (ret != 0) {
1057                 return ret;
1058         }
1059
1060         printf("%-26s = %u\n", argv[0], value);
1061         return 0;
1062 }
1063
1064 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1065                           int argc, const char **argv)
1066 {
1067         struct ctdb_var_list *tun_var_list;
1068         struct ctdb_tunable tunable;
1069         int ret, i;
1070         bool found;
1071
1072         if (argc != 2) {
1073                 usage("setvar");
1074         }
1075
1076         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1077                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1078         if (ret != 0) {
1079                 fprintf(stderr,
1080                         "Failed to get list of variables from node %u\n",
1081                         ctdb->cmd_pnn);
1082                 return ret;
1083         }
1084
1085         found = false;
1086         for (i=0; i<tun_var_list->count; i++) {
1087                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1088                         found = true;
1089                         break;
1090                 }
1091         }
1092
1093         if (! found) {
1094                 printf("No such tunable %s\n", argv[0]);
1095                 return 1;
1096         }
1097
1098         tunable.name = argv[0];
1099         tunable.value = strtoul(argv[1], NULL, 0);
1100
1101         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1102                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1103         if (ret != 0) {
1104                 if (ret == 1) {
1105                         fprintf(stderr,
1106                                 "Setting obsolete tunable variable '%s'\n",
1107                                tunable.name);
1108                         return 0;
1109                 }
1110         }
1111
1112         return ret;
1113 }
1114
1115 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1116                             int argc, const char **argv)
1117 {
1118         struct ctdb_var_list *tun_var_list;
1119         int ret, i;
1120
1121         if (argc != 0) {
1122                 usage("listvars");
1123         }
1124
1125         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1126                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1127         if (ret != 0) {
1128                 return ret;
1129         }
1130
1131         for (i=0; i<tun_var_list->count; i++) {
1132                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1133         }
1134
1135         return 0;
1136 }
1137
1138 const struct {
1139         const char *name;
1140         uint32_t offset;
1141 } stats_fields[] = {
1142 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1143         STATISTICS_FIELD(num_clients),
1144         STATISTICS_FIELD(frozen),
1145         STATISTICS_FIELD(recovering),
1146         STATISTICS_FIELD(num_recoveries),
1147         STATISTICS_FIELD(client_packets_sent),
1148         STATISTICS_FIELD(client_packets_recv),
1149         STATISTICS_FIELD(node_packets_sent),
1150         STATISTICS_FIELD(node_packets_recv),
1151         STATISTICS_FIELD(keepalive_packets_sent),
1152         STATISTICS_FIELD(keepalive_packets_recv),
1153         STATISTICS_FIELD(node.req_call),
1154         STATISTICS_FIELD(node.reply_call),
1155         STATISTICS_FIELD(node.req_dmaster),
1156         STATISTICS_FIELD(node.reply_dmaster),
1157         STATISTICS_FIELD(node.reply_error),
1158         STATISTICS_FIELD(node.req_message),
1159         STATISTICS_FIELD(node.req_control),
1160         STATISTICS_FIELD(node.reply_control),
1161         STATISTICS_FIELD(client.req_call),
1162         STATISTICS_FIELD(client.req_message),
1163         STATISTICS_FIELD(client.req_control),
1164         STATISTICS_FIELD(timeouts.call),
1165         STATISTICS_FIELD(timeouts.control),
1166         STATISTICS_FIELD(timeouts.traverse),
1167         STATISTICS_FIELD(locks.num_calls),
1168         STATISTICS_FIELD(locks.num_current),
1169         STATISTICS_FIELD(locks.num_pending),
1170         STATISTICS_FIELD(locks.num_failed),
1171         STATISTICS_FIELD(total_calls),
1172         STATISTICS_FIELD(pending_calls),
1173         STATISTICS_FIELD(childwrite_calls),
1174         STATISTICS_FIELD(pending_childwrite_calls),
1175         STATISTICS_FIELD(memory_used),
1176         STATISTICS_FIELD(max_hop_count),
1177         STATISTICS_FIELD(total_ro_delegations),
1178         STATISTICS_FIELD(total_ro_revokes),
1179 };
1180
1181 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1182
1183 static void print_statistics_machine(struct ctdb_statistics *s,
1184                                      bool show_header)
1185 {
1186         int i;
1187
1188         if (show_header) {
1189                 printf("CTDB version%s", options.sep);
1190                 printf("Current time of statistics%s", options.sep);
1191                 printf("Statistics collected since%s", options.sep);
1192                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1193                         printf("%s%s", stats_fields[i].name, options.sep);
1194                 }
1195                 printf("num_reclock_ctdbd_latency%s", options.sep);
1196                 printf("min_reclock_ctdbd_latency%s", options.sep);
1197                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1198                 printf("max_reclock_ctdbd_latency%s", options.sep);
1199
1200                 printf("num_reclock_recd_latency%s", options.sep);
1201                 printf("min_reclock_recd_latency%s", options.sep);
1202                 printf("avg_reclock_recd_latency%s", options.sep);
1203                 printf("max_reclock_recd_latency%s", options.sep);
1204
1205                 printf("num_call_latency%s", options.sep);
1206                 printf("min_call_latency%s", options.sep);
1207                 printf("avg_call_latency%s", options.sep);
1208                 printf("max_call_latency%s", options.sep);
1209
1210                 printf("num_lockwait_latency%s", options.sep);
1211                 printf("min_lockwait_latency%s", options.sep);
1212                 printf("avg_lockwait_latency%s", options.sep);
1213                 printf("max_lockwait_latency%s", options.sep);
1214
1215                 printf("num_childwrite_latency%s", options.sep);
1216                 printf("min_childwrite_latency%s", options.sep);
1217                 printf("avg_childwrite_latency%s", options.sep);
1218                 printf("max_childwrite_latency%s", options.sep);
1219                 printf("\n");
1220         }
1221
1222         printf("%u%s", CTDB_PROTOCOL, options.sep);
1223         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1224         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1225         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1226                 printf("%u%s",
1227                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1228                        options.sep);
1229         }
1230         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1231         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1232         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1233         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1234
1235         printf("%u%s", s->reclock.recd.num, options.sep);
1236         printf("%.6f%s", s->reclock.recd.min, options.sep);
1237         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1238         printf("%.6f%s", s->reclock.recd.max, options.sep);
1239
1240         printf("%d%s", s->call_latency.num, options.sep);
1241         printf("%.6f%s", s->call_latency.min, options.sep);
1242         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1243         printf("%.6f%s", s->call_latency.max, options.sep);
1244
1245         printf("%d%s", s->childwrite_latency.num, options.sep);
1246         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1247         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1248         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1249         printf("\n");
1250 }
1251
1252 static void print_statistics(struct ctdb_statistics *s)
1253 {
1254         int tmp, days, hours, minutes, seconds;
1255         int i;
1256         const char *prefix = NULL;
1257         int preflen = 0;
1258
1259         tmp = s->statistics_current_time.tv_sec -
1260               s->statistics_start_time.tv_sec;
1261         seconds = tmp % 60; tmp /= 60;
1262         minutes = tmp % 60; tmp /= 60;
1263         hours   = tmp % 24; tmp /= 24;
1264         days    = tmp;
1265
1266         printf("CTDB version %u\n", CTDB_PROTOCOL);
1267         printf("Current time of statistics  :                %s",
1268                ctime(&s->statistics_current_time.tv_sec));
1269         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1270                days, hours, minutes, seconds,
1271                ctime(&s->statistics_start_time.tv_sec));
1272
1273         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1274                 if (strchr(stats_fields[i].name, '.') != NULL) {
1275                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1276                         if (! prefix ||
1277                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1278                                 prefix = stats_fields[i].name;
1279                                 printf(" %*.*s\n", preflen-1, preflen-1,
1280                                        stats_fields[i].name);
1281                         }
1282                 } else {
1283                         preflen = 0;
1284                 }
1285                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1286                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1287                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1288         }
1289
1290         printf(" hop_count_buckets:");
1291         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1292                 printf(" %d", s->hop_count_bucket[i]);
1293         }
1294         printf("\n");
1295         printf(" lock_buckets:");
1296         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1297                 printf(" %d", s->locks.buckets[i]);
1298         }
1299         printf("\n");
1300         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1301                "locks_latency      MIN/AVG/MAX",
1302                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1303                s->locks.latency.max, s->locks.latency.num);
1304
1305         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1306                "reclock_ctdbd      MIN/AVG/MAX",
1307                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1308                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1309
1310         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1311                "reclock_recd       MIN/AVG/MAX",
1312                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1313                s->reclock.recd.max, s->reclock.recd.num);
1314
1315         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1316                "call_latency       MIN/AVG/MAX",
1317                s->call_latency.min, LATENCY_AVG(s->call_latency),
1318                s->call_latency.max, s->call_latency.num);
1319
1320         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1321                "childwrite_latency MIN/AVG/MAX",
1322                s->childwrite_latency.min,
1323                LATENCY_AVG(s->childwrite_latency),
1324                s->childwrite_latency.max, s->childwrite_latency.num);
1325 }
1326
1327 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1328                               int argc, const char **argv)
1329 {
1330         struct ctdb_statistics *stats;
1331         int ret;
1332
1333         if (argc != 0) {
1334                 usage("statistics");
1335         }
1336
1337         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1338                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1339         if (ret != 0) {
1340                 return ret;
1341         }
1342
1343         if (options.machinereadable) {
1344                 print_statistics_machine(stats, true);
1345         } else {
1346                 print_statistics(stats);
1347         }
1348
1349         return 0;
1350 }
1351
1352 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1353                                     struct ctdb_context *ctdb,
1354                                     int argc, const char **argv)
1355 {
1356         int ret;
1357
1358         if (argc != 0) {
1359                 usage("statisticsreset");
1360         }
1361
1362         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1363                                          ctdb->cmd_pnn, TIMEOUT());
1364         if (ret != 0) {
1365                 return ret;
1366         }
1367
1368         return 0;
1369 }
1370
1371 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1372                          int argc, const char **argv)
1373 {
1374         struct ctdb_statistics_list *slist;
1375         int ret, count = 0, i;
1376         bool show_header = true;
1377
1378         if (argc > 1) {
1379                 usage("stats");
1380         }
1381
1382         if (argc == 1) {
1383                 count = atoi(argv[0]);
1384         }
1385
1386         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1387                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1388         if (ret != 0) {
1389                 return ret;
1390         }
1391
1392         for (i=0; i<slist->num; i++) {
1393                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1394                         continue;
1395                 }
1396                 if (options.machinereadable == 1) {
1397                         print_statistics_machine(&slist->stats[i],
1398                                                  show_header);
1399                         show_header = false;
1400                 } else {
1401                         print_statistics(&slist->stats[i]);
1402                 }
1403                 if (count > 0 && i == count) {
1404                         break;
1405                 }
1406         }
1407
1408         return 0;
1409 }
1410
1411 static int ctdb_public_ip_cmp(const void *a, const void *b)
1412 {
1413         const struct ctdb_public_ip *ip_a = a;
1414         const struct ctdb_public_ip *ip_b = b;
1415
1416         return ctdb_sock_addr_cmp(&ip_a->addr, &ip_b->addr);
1417 }
1418
1419 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1420                      struct ctdb_public_ip_list *ips,
1421                      struct ctdb_public_ip_info **ipinfo,
1422                      bool all_nodes)
1423 {
1424         int i, j;
1425         char *conf, *avail, *active;
1426
1427         if (options.machinereadable == 1) {
1428                 printf("%s%s%s%s%s", options.sep,
1429                        "Public IP", options.sep,
1430                        "Node", options.sep);
1431                 if (options.verbose == 1) {
1432                         printf("%s%s%s%s%s%s\n",
1433                                "ActiveInterfaces", options.sep,
1434                                "AvailableInterfaces", options.sep,
1435                                "ConfiguredInterfaces", options.sep);
1436                 } else {
1437                         printf("\n");
1438                 }
1439         } else {
1440                 if (all_nodes) {
1441                         printf("Public IPs on ALL nodes\n");
1442                 } else {
1443                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1444                 }
1445         }
1446
1447         for (i = 0; i < ips->num; i++) {
1448
1449                 if (options.machinereadable == 1) {
1450                         printf("%s%s%s%d%s", options.sep,
1451                                ctdb_sock_addr_to_string(
1452                                        mem_ctx, &ips->ip[i].addr, false),
1453                                options.sep,
1454                                (int)ips->ip[i].pnn, options.sep);
1455                 } else {
1456                         printf("%s", ctdb_sock_addr_to_string(
1457                                        mem_ctx, &ips->ip[i].addr, false));
1458                 }
1459
1460                 if (options.verbose == 0) {
1461                         if (options.machinereadable == 1) {
1462                                 printf("\n");
1463                         } else {
1464                                 printf(" %d\n", (int)ips->ip[i].pnn);
1465                         }
1466                         continue;
1467                 }
1468
1469                 conf = NULL;
1470                 avail = NULL;
1471                 active = NULL;
1472
1473                 if (ipinfo[i] == NULL) {
1474                         goto skip_ipinfo;
1475                 }
1476
1477                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1478                         struct ctdb_iface *iface;
1479
1480                         iface = &ipinfo[i]->ifaces->iface[j];
1481                         if (conf == NULL) {
1482                                 conf = talloc_strdup(mem_ctx, iface->name);
1483                         } else {
1484                                 conf = talloc_asprintf_append(
1485                                                 conf, ",%s", iface->name);
1486                         }
1487
1488                         if (ipinfo[i]->active_idx == j) {
1489                                 active = iface->name;
1490                         }
1491
1492                         if (iface->link_state == 0) {
1493                                 continue;
1494                         }
1495
1496                         if (avail == NULL) {
1497                                 avail = talloc_strdup(mem_ctx, iface->name);
1498                         } else {
1499                                 avail = talloc_asprintf_append(
1500                                                 avail, ",%s", iface->name);
1501                         }
1502                 }
1503
1504         skip_ipinfo:
1505
1506                 if (options.machinereadable == 1) {
1507                         printf("%s%s%s%s%s%s\n",
1508                                active ? active : "", options.sep,
1509                                avail ? avail : "", options.sep,
1510                                conf ? conf : "", options.sep);
1511                 } else {
1512                         printf(" node[%d] active[%s] available[%s]"
1513                                " configured[%s]\n",
1514                                (int)ips->ip[i].pnn, active ? active : "",
1515                                avail ? avail : "", conf ? conf : "");
1516                 }
1517         }
1518 }
1519
1520 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1521                        size_t datalen, void *private_data)
1522 {
1523         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1524                 private_data, struct ctdb_public_ip_list);
1525         struct ctdb_public_ip *ip;
1526
1527         ip = (struct ctdb_public_ip *)databuf;
1528         ips->ip[ips->num] = *ip;
1529         ips->num += 1;
1530
1531         return 0;
1532 }
1533
1534 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1535                               struct ctdb_public_ip_list **out)
1536 {
1537         struct ctdb_node_map *nodemap;
1538         struct ctdb_public_ip_list *ips;
1539         struct db_hash_context *ipdb;
1540         uint32_t *pnn_list;
1541         int ret, count, i, j;
1542
1543         nodemap = get_nodemap(ctdb, false);
1544         if (nodemap == NULL) {
1545                 return 1;
1546         }
1547
1548         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1549         if (ret != 0) {
1550                 goto failed;
1551         }
1552
1553         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1554                                      &pnn_list);
1555         if (count <= 0) {
1556                 goto failed;
1557         }
1558
1559         for (i=0; i<count; i++) {
1560                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1561                                                pnn_list[i], TIMEOUT(),
1562                                                false, &ips);
1563                 if (ret != 0) {
1564                         goto failed;
1565                 }
1566
1567                 for (j=0; j<ips->num; j++) {
1568                         struct ctdb_public_ip ip;
1569
1570                         ip.pnn = ips->ip[j].pnn;
1571                         ip.addr = ips->ip[j].addr;
1572
1573                         if (pnn_list[i] == ip.pnn) {
1574                                 /* Node claims IP is hosted on it, so
1575                                  * save that information
1576                                  */
1577                                 ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1578                                                   sizeof(ip.addr),
1579                                                   (uint8_t *)&ip, sizeof(ip));
1580                                 if (ret != 0) {
1581                                         goto failed;
1582                                 }
1583                         } else {
1584                                 /* Node thinks IP is hosted elsewhere,
1585                                  * so overwrite with CTDB_UNKNOWN_PNN
1586                                  * if there's no existing entry
1587                                  */
1588                                 ret = db_hash_exists(ipdb, (uint8_t *)&ip.addr,
1589                                                      sizeof(ip.addr));
1590                                 if (ret == ENOENT) {
1591                                         ip.pnn = CTDB_UNKNOWN_PNN;
1592                                         ret = db_hash_add(ipdb,
1593                                                           (uint8_t *)&ip.addr,
1594                                                           sizeof(ip.addr),
1595                                                           (uint8_t *)&ip,
1596                                                           sizeof(ip));
1597                                         if (ret != 0) {
1598                                                 goto failed;
1599                                         }
1600                                 }
1601                         }
1602                 }
1603
1604                 TALLOC_FREE(ips);
1605         }
1606
1607         talloc_free(pnn_list);
1608
1609         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1610         if (ret != 0) {
1611                 goto failed;
1612         }
1613
1614         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1615         if (ips == NULL) {
1616                 goto failed;
1617         }
1618
1619         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1620         if (ips->ip == NULL) {
1621                 goto failed;
1622         }
1623
1624         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1625         if (ret != 0) {
1626                 goto failed;
1627         }
1628
1629         if (count != ips->num) {
1630                 goto failed;
1631         }
1632
1633         talloc_free(ipdb);
1634
1635         *out = ips;
1636         return 0;
1637
1638 failed:
1639         talloc_free(ipdb);
1640         return 1;
1641 }
1642
1643 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1644                       int argc, const char **argv)
1645 {
1646         struct ctdb_public_ip_list *ips;
1647         struct ctdb_public_ip_info **ipinfo;
1648         int ret, i;
1649         bool do_all = false;
1650
1651         if (argc > 1) {
1652                 usage("ip");
1653         }
1654
1655         if (argc == 1) {
1656                 if (strcmp(argv[0], "all") == 0) {
1657                         do_all = true;
1658                 } else {
1659                         usage("ip");
1660                 }
1661         }
1662
1663         if (do_all) {
1664                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1665         } else {
1666                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1667                                                ctdb->cmd_pnn, TIMEOUT(),
1668                                                false, &ips);
1669         }
1670         if (ret != 0) {
1671                 return ret;
1672         }
1673
1674         qsort(ips->ip, ips->num, sizeof(struct ctdb_public_ip),
1675               ctdb_public_ip_cmp);
1676
1677         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1678         if (ipinfo == NULL) {
1679                 return 1;
1680         }
1681
1682         for (i=0; i<ips->num; i++) {
1683                 uint32_t pnn;
1684                 if (do_all) {
1685                         pnn = ips->ip[i].pnn;
1686                 } else {
1687                         pnn = ctdb->cmd_pnn;
1688                 }
1689                 if (pnn == CTDB_UNKNOWN_PNN) {
1690                         ipinfo[i] = NULL;
1691                         continue;
1692                 }
1693                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1694                                                    ctdb->client, pnn,
1695                                                    TIMEOUT(), &ips->ip[i].addr,
1696                                                    &ipinfo[i]);
1697                 if (ret != 0) {
1698                         return ret;
1699                 }
1700         }
1701
1702         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1703         return 0;
1704 }
1705
1706 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1707                           int argc, const char **argv)
1708 {
1709         struct ctdb_public_ip_info *ipinfo;
1710         ctdb_sock_addr addr;
1711         int ret, i;
1712
1713         if (argc != 1) {
1714                 usage("ipinfo");
1715         }
1716
1717         if (! parse_ip(argv[0], NULL, 0, &addr)) {
1718                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1719                 return 1;
1720         }
1721
1722         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1723                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1724                                            &ipinfo);
1725         if (ret != 0) {
1726                 if (ret == -1) {
1727                         printf("Node %u does not know about IP %s\n",
1728                                ctdb->cmd_pnn, argv[0]);
1729                 }
1730                 return ret;
1731         }
1732
1733         printf("Public IP[%s] info on node %u\n",
1734                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1735                                         ctdb->cmd_pnn);
1736
1737         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1738                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1739                ipinfo->ip.pnn, ipinfo->ifaces->num);
1740
1741         for (i=0; i<ipinfo->ifaces->num; i++) {
1742                 struct ctdb_iface *iface;
1743
1744                 iface = &ipinfo->ifaces->iface[i];
1745                 iface->name[CTDB_IFACE_SIZE] = '\0';
1746                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1747                        i+1, iface->name,
1748                        iface->link_state == 0 ? "down" : "up",
1749                        iface->references,
1750                        (i == ipinfo->active_idx) ? " (active)" : "");
1751         }
1752
1753         return 0;
1754 }
1755
1756 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1757                           int argc, const char **argv)
1758 {
1759         struct ctdb_iface_list *ifaces;
1760         int ret, i;
1761
1762         if (argc != 0) {
1763                 usage("ifaces");
1764         }
1765
1766         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1767                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1768         if (ret != 0) {
1769                 return ret;
1770         }
1771
1772         if (ifaces->num == 0) {
1773                 printf("No interfaces configured on node %u\n",
1774                        ctdb->cmd_pnn);
1775                 return 0;
1776         }
1777
1778         if (options.machinereadable) {
1779                 printf("%s%s%s%s%s%s%s\n", options.sep,
1780                        "Name", options.sep,
1781                        "LinkStatus", options.sep,
1782                        "References", options.sep);
1783         } else {
1784                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1785         }
1786
1787         for (i=0; i<ifaces->num; i++) {
1788                 if (options.machinereadable) {
1789                         printf("%s%s%s%u%s%u%s\n", options.sep,
1790                                ifaces->iface[i].name, options.sep,
1791                                ifaces->iface[i].link_state, options.sep,
1792                                ifaces->iface[i].references, options.sep);
1793                 } else {
1794                         printf("name:%s link:%s references:%u\n",
1795                                ifaces->iface[i].name,
1796                                ifaces->iface[i].link_state ? "up" : "down",
1797                                ifaces->iface[i].references);
1798                 }
1799         }
1800
1801         return 0;
1802 }
1803
1804 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1805                                 int argc, const char **argv)
1806 {
1807         struct ctdb_iface_list *ifaces;
1808         struct ctdb_iface *iface;
1809         int ret, i;
1810
1811         if (argc != 2) {
1812                 usage("setifacelink");
1813         }
1814
1815         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1816                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1817                 return 1;
1818         }
1819
1820         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1821                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1822         if (ret != 0) {
1823                 fprintf(stderr,
1824                         "Failed to get interface information from node %u\n",
1825                         ctdb->cmd_pnn);
1826                 return ret;
1827         }
1828
1829         iface = NULL;
1830         for (i=0; i<ifaces->num; i++) {
1831                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1832                         iface = &ifaces->iface[i];
1833                         break;
1834                 }
1835         }
1836
1837         if (iface == NULL) {
1838                 printf("Interface %s not configured on node %u\n",
1839                        argv[0], ctdb->cmd_pnn);
1840                 return 1;
1841         }
1842
1843         if (strcmp(argv[1], "up") == 0) {
1844                 iface->link_state = 1;
1845         } else if (strcmp(argv[1], "down") == 0) {
1846                 iface->link_state = 0;
1847         } else {
1848                 usage("setifacelink");
1849                 return 1;
1850         }
1851
1852         iface->references = 0;
1853
1854         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1855                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1856         if (ret != 0) {
1857                 return ret;
1858         }
1859
1860         return 0;
1861 }
1862
1863 static int control_process_exists(TALLOC_CTX *mem_ctx,
1864                                   struct ctdb_context *ctdb,
1865                                   int argc, const char **argv)
1866 {
1867         pid_t pid;
1868         uint64_t srvid = 0;
1869         int ret, status;
1870
1871         if (argc != 1 && argc != 2) {
1872                 usage("process-exists");
1873         }
1874
1875         pid = atoi(argv[0]);
1876         if (argc == 2) {
1877                 srvid = strtoull(argv[1], NULL, 0);
1878         }
1879
1880         if (srvid == 0) {
1881                 ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1882                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1883         } else {
1884                 struct ctdb_pid_srvid pid_srvid;
1885
1886                 pid_srvid.pid = pid;
1887                 pid_srvid.srvid = srvid;
1888
1889                 ret = ctdb_ctrl_check_pid_srvid(mem_ctx, ctdb->ev,
1890                                                 ctdb->client, ctdb->cmd_pnn,
1891                                                 TIMEOUT(), &pid_srvid,
1892                                                 &status);
1893         }
1894
1895         if (ret != 0) {
1896                 return ret;
1897         }
1898
1899         if (srvid == 0) {
1900                 printf("PID %d %s\n", pid,
1901                        (status == 0 ? "exists" : "does not exist"));
1902         } else {
1903                 printf("PID %d with SRVID 0x%"PRIx64" %s\n", pid, srvid,
1904                        (status == 0 ? "exists" : "does not exist"));
1905         }
1906         return status;
1907 }
1908
1909 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1910                             int argc, const char **argv)
1911 {
1912         struct ctdb_dbid_map *dbmap;
1913         int ret, i;
1914
1915         if (argc != 0) {
1916                 usage("getdbmap");
1917         }
1918
1919         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1920                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1921         if (ret != 0) {
1922                 return ret;
1923         }
1924
1925         if (options.machinereadable == 1) {
1926                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1927                        options.sep,
1928                        "ID", options.sep,
1929                        "Name", options.sep,
1930                        "Path", options.sep,
1931                        "Persistent", options.sep,
1932                        "Sticky", options.sep,
1933                        "Unhealthy", options.sep,
1934                        "Readonly", options.sep,
1935                        "Replicated", options.sep);
1936         } else {
1937                 printf("Number of databases:%d\n", dbmap->num);
1938         }
1939
1940         for (i=0; i<dbmap->num; i++) {
1941                 const char *name;
1942                 const char *path;
1943                 const char *health;
1944                 bool persistent;
1945                 bool readonly;
1946                 bool sticky;
1947                 bool replicated;
1948                 uint32_t db_id;
1949
1950                 db_id = dbmap->dbs[i].db_id;
1951
1952                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1953                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1954                                            &name);
1955                 if (ret != 0) {
1956                         return ret;
1957                 }
1958
1959                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1960                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1961                                           &path);
1962                 if (ret != 0) {
1963                         return ret;
1964                 }
1965
1966                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1967                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1968                                               &health);
1969                 if (ret != 0) {
1970                         return ret;
1971                 }
1972
1973                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1974                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1975                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1976                 replicated = dbmap->dbs[i].flags & CTDB_DB_FLAGS_REPLICATED;
1977
1978                 if (options.machinereadable == 1) {
1979                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s%d%s\n",
1980                                options.sep,
1981                                db_id, options.sep,
1982                                name, options.sep,
1983                                path, options.sep,
1984                                !! (persistent), options.sep,
1985                                !! (sticky), options.sep,
1986                                !! (health), options.sep,
1987                                !! (readonly), options.sep,
1988                                !! (replicated), options.sep);
1989                 } else {
1990                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s%s\n",
1991                                db_id, name, path,
1992                                persistent ? " PERSISTENT" : "",
1993                                sticky ? " STICKY" : "",
1994                                readonly ? " READONLY" : "",
1995                                replicated ? " REPLICATED" : "",
1996                                health ? " UNHEALTHY" : "");
1997                 }
1998
1999                 talloc_free(discard_const(name));
2000                 talloc_free(discard_const(path));
2001                 talloc_free(discard_const(health));
2002         }
2003
2004         return 0;
2005 }
2006
2007 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2008                                int argc, const char **argv)
2009 {
2010         uint32_t db_id;
2011         const char *db_name, *db_path, *db_health;
2012         uint8_t db_flags;
2013         int ret;
2014
2015         if (argc != 1) {
2016                 usage("getdbstatus");
2017         }
2018
2019         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2020                 return 1;
2021         }
2022
2023         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
2024                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
2025                                   &db_path);
2026         if (ret != 0) {
2027                 return ret;
2028         }
2029
2030         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
2031                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
2032                                       &db_health);
2033         if (ret != 0) {
2034                 return ret;
2035         }
2036
2037         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
2038         printf("PERSISTENT: %s\nREPLICATED: %s\nSTICKY: %s\nREADONLY: %s\n",
2039                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
2040                (db_flags & CTDB_DB_FLAGS_REPLICATED ? "yes" : "no"),
2041                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
2042                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"));
2043         printf("HEALTH: %s\n", (db_health ? db_health : "OK"));
2044         return 0;
2045 }
2046
2047 struct dump_record_state {
2048         uint32_t count;
2049 };
2050
2051 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
2052
2053 static void dump_tdb_data(const char *name, TDB_DATA val)
2054 {
2055         int i;
2056
2057         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
2058         for (i=0; i<val.dsize; i++) {
2059                 if (ISASCII(val.dptr[i])) {
2060                         fprintf(stdout, "%c", val.dptr[i]);
2061                 } else {
2062                         fprintf(stdout, "\\%02X", val.dptr[i]);
2063                 }
2064         }
2065         fprintf(stdout, "\"\n");
2066 }
2067
2068 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
2069 {
2070         fprintf(stdout, "dmaster: %u\n", header->dmaster);
2071         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
2072         fprintf(stdout, "flags: 0x%08x", header->flags);
2073         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
2074                 fprintf(stdout, " MIGRATED_WITH_DATA");
2075         }
2076         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
2077                 fprintf(stdout, " VACUUM_MIGRATED");
2078         }
2079         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
2080                 fprintf(stdout, " AUTOMATIC");
2081         }
2082         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
2083                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
2084         }
2085         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
2086                 fprintf(stdout, " RO_HAVE_READONLY");
2087         }
2088         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
2089                 fprintf(stdout, " RO_REVOKING_READONLY");
2090         }
2091         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
2092                 fprintf(stdout, " RO_REVOKE_COMPLETE");
2093         }
2094         fprintf(stdout, "\n");
2095
2096 }
2097
2098 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
2099                        TDB_DATA key, TDB_DATA data, void *private_data)
2100 {
2101         struct dump_record_state *state =
2102                 (struct dump_record_state *)private_data;
2103
2104         state->count += 1;
2105
2106         dump_tdb_data("key", key);
2107         dump_ltdb_header(header);
2108         dump_tdb_data("data", data);
2109         fprintf(stdout, "\n");
2110
2111         return 0;
2112 }
2113
2114 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2115                          int argc, const char **argv)
2116 {
2117         struct ctdb_db_context *db;
2118         const char *db_name;
2119         uint32_t db_id;
2120         uint8_t db_flags;
2121         struct dump_record_state state;
2122         int ret;
2123
2124         if (argc != 1) {
2125                 usage("catdb");
2126         }
2127
2128         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2129                 return 1;
2130         }
2131
2132         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2133                           db_flags, &db);
2134         if (ret != 0) {
2135                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2136                 return ret;
2137         }
2138
2139         state.count = 0;
2140
2141         ret = ctdb_db_traverse(mem_ctx, ctdb->ev, ctdb->client, db,
2142                                ctdb->cmd_pnn, TIMEOUT(),
2143                                dump_record, &state);
2144
2145         printf("Dumped %u records\n", state.count);
2146
2147         return ret;
2148 }
2149
2150 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2151                           int argc, const char **argv)
2152 {
2153         struct ctdb_db_context *db;
2154         const char *db_name;
2155         uint32_t db_id;
2156         uint8_t db_flags;
2157         struct dump_record_state state;
2158         int ret;
2159
2160         if (argc != 1) {
2161                 usage("catdb");
2162         }
2163
2164         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2165                 return 1;
2166         }
2167
2168         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2169                           db_flags, &db);
2170         if (ret != 0) {
2171                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2172                 return ret;
2173         }
2174
2175         state.count = 0;
2176         ret = ctdb_db_traverse_local(db, true, true, dump_record, &state);
2177
2178         printf("Dumped %u record(s)\n", state.count);
2179
2180         return ret;
2181 }
2182
2183 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2184                                    struct ctdb_context *ctdb,
2185                                    int argc, const char **argv)
2186 {
2187         uint32_t caps;
2188         int ret;
2189
2190         if (argc != 0) {
2191                 usage("getcapabilities");
2192         }
2193
2194         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2195                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2196         if (ret != 0) {
2197                 return ret;
2198         }
2199
2200         if (options.machinereadable == 1) {
2201                 printf("%s%s%s%s%s\n",
2202                        options.sep,
2203                        "RECMASTER", options.sep,
2204                        "LMASTER", options.sep);
2205                 printf("%s%d%s%d%s\n", options.sep,
2206                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2207                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2208         } else {
2209                 printf("RECMASTER: %s\n",
2210                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2211                 printf("LMASTER: %s\n",
2212                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2213         }
2214
2215         return 0;
2216 }
2217
2218 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2219                        int argc, const char **argv)
2220 {
2221         printf("%u\n", ctdb_client_pnn(ctdb->client));
2222         return 0;
2223 }
2224
2225 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2226                        int argc, const char **argv)
2227 {
2228         char *t, *lvs_helper = NULL;
2229
2230         if (argc != 1) {
2231                 usage("lvs");
2232         }
2233
2234         t = getenv("CTDB_LVS_HELPER");
2235         if (t != NULL) {
2236                 lvs_helper = talloc_strdup(mem_ctx, t);
2237         } else {
2238                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2239                                              CTDB_HELPER_BINDIR);
2240         }
2241
2242         if (lvs_helper == NULL) {
2243                 fprintf(stderr, "Unable to set LVS helper\n");
2244                 return 1;
2245         }
2246
2247         return run_helper(mem_ctx, "LVS helper", lvs_helper, argc, argv);
2248 }
2249
2250 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2251                             int argc, const char **argv)
2252 {
2253         int log_level;
2254         int ret;
2255         bool found;
2256
2257         if (argc != 1) {
2258                 usage("setdebug");
2259         }
2260
2261         found = debug_level_parse(argv[0], &log_level);
2262         if (! found) {
2263                 fprintf(stderr,
2264                         "Invalid debug level '%s'. Valid levels are:\n",
2265                         argv[0]);
2266                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2267                 return 1;
2268         }
2269
2270         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2271                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2272         if (ret != 0) {
2273                 return ret;
2274         }
2275
2276         return 0;
2277 }
2278
2279 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2280                             int argc, const char **argv)
2281 {
2282         int loglevel;
2283         const char *log_str;
2284         int ret;
2285
2286         if (argc != 0) {
2287                 usage("getdebug");
2288         }
2289
2290         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2291                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2292         if (ret != 0) {
2293                 return ret;
2294         }
2295
2296         log_str = debug_level_to_string(loglevel);
2297         printf("%s\n", log_str);
2298
2299         return 0;
2300 }
2301
2302 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2303                           int argc, const char **argv)
2304 {
2305         const char *db_name;
2306         uint8_t db_flags = 0;
2307         int ret;
2308
2309         if (argc < 1 || argc > 2) {
2310                 usage("attach");
2311         }
2312
2313         db_name = argv[0];
2314         if (argc == 2) {
2315                 if (strcmp(argv[1], "persistent") == 0) {
2316                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2317                 } else if (strcmp(argv[1], "readonly") == 0) {
2318                         db_flags = CTDB_DB_FLAGS_READONLY;
2319                 } else if (strcmp(argv[1], "sticky") == 0) {
2320                         db_flags = CTDB_DB_FLAGS_STICKY;
2321                 } else if (strcmp(argv[1], "replicated") == 0) {
2322                         db_flags = CTDB_DB_FLAGS_REPLICATED;
2323                 } else {
2324                         usage("attach");
2325                 }
2326         }
2327
2328         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2329                           db_flags, NULL);
2330         if (ret != 0) {
2331                 return ret;
2332         }
2333
2334         return 0;
2335 }
2336
2337 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2338                           int argc, const char **argv)
2339 {
2340         const char *db_name;
2341         uint32_t db_id;
2342         uint8_t db_flags;
2343         struct ctdb_node_map *nodemap;
2344         int recmode;
2345         int ret, ret2, i;
2346
2347         if (argc < 1) {
2348                 usage("detach");
2349         }
2350
2351         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2352                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2353         if (ret != 0) {
2354                 return ret;
2355         }
2356
2357         if (recmode == CTDB_RECOVERY_ACTIVE) {
2358                 fprintf(stderr, "Database cannot be detached"
2359                                 " when recovery is active\n");
2360                 return 1;
2361         }
2362
2363         nodemap = get_nodemap(ctdb, false);
2364         if (nodemap == NULL) {
2365                 return 1;
2366         }
2367
2368         for (i=0; i<nodemap->num; i++) {
2369                 uint32_t value;
2370
2371                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2372                         continue;
2373                 }
2374                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2375                         continue;
2376                 }
2377                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2378                         fprintf(stderr, "Database cannot be detached on"
2379                                 " inactive (stopped or banned) node %u\n",
2380                                 nodemap->node[i].pnn);
2381                         return 1;
2382                 }
2383
2384                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2385                                             nodemap->node[i].pnn, TIMEOUT(),
2386                                             "AllowClientDBAttach", &value);
2387                 if (ret != 0) {
2388                         fprintf(stderr,
2389                                 "Unable to get tunable AllowClientDBAttach"
2390                                 " from node %u\n", nodemap->node[i].pnn);
2391                         return ret;
2392                 }
2393
2394                 if (value == 1) {
2395                         fprintf(stderr,
2396                                 "Database access is still active on node %u."
2397                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2398                                 nodemap->node[i].pnn);
2399                         return 1;
2400                 }
2401         }
2402
2403         ret2 = 0;
2404         for (i=0; i<argc; i++) {
2405                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2406                                 &db_flags)) {
2407                         continue;
2408                 }
2409
2410                 if (db_flags &
2411                     (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
2412                         fprintf(stderr,
2413                                 "Only volatile databases can be detached\n");
2414                         return 1;
2415                 }
2416
2417                 ret = ctdb_detach(ctdb->ev, ctdb->client, TIMEOUT(), db_id);
2418                 if (ret != 0) {
2419                         fprintf(stderr, "Database %s detach failed\n", db_name);
2420                         ret2 = ret;
2421                 }
2422         }
2423
2424         return ret2;
2425 }
2426
2427 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2428                               int argc, const char **argv)
2429 {
2430         const char *mem_str;
2431         ssize_t n;
2432         int ret;
2433
2434         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2435                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2436         if (ret != 0) {
2437                 return ret;
2438         }
2439
2440         n = write(1, mem_str, strlen(mem_str)+1);
2441         if (n < 0 || n != strlen(mem_str)+1) {
2442                 fprintf(stderr, "Failed to write talloc summary\n");
2443                 return 1;
2444         }
2445
2446         return 0;
2447 }
2448
2449 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2450 {
2451         bool *done = (bool *)private_data;
2452         ssize_t n;
2453
2454         n = write(1, data.dptr, data.dsize);
2455         if (n < 0 || n != data.dsize) {
2456                 fprintf(stderr, "Failed to write talloc summary\n");
2457         }
2458
2459         *done = true;
2460 }
2461
2462 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2463                                 int argc, const char **argv)
2464 {
2465         struct ctdb_srvid_message msg = { 0 };
2466         int ret;
2467         bool done = false;
2468
2469         msg.pnn = ctdb->pnn;
2470         msg.srvid = next_srvid(ctdb);
2471
2472         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2473                                               msg.srvid, dump_memory, &done);
2474         if (ret != 0) {
2475                 return ret;
2476         }
2477
2478         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2479                                     ctdb->cmd_pnn, &msg);
2480         if (ret != 0) {
2481                 return ret;
2482         }
2483
2484         ctdb_client_wait(ctdb->ev, &done);
2485         return 0;
2486 }
2487
2488 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2489                           int argc, const char **argv)
2490 {
2491         pid_t pid;
2492         int ret;
2493
2494         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2495                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2496         if (ret != 0) {
2497                 return ret;
2498         }
2499
2500         printf("%u\n", pid);
2501         return 0;
2502 }
2503
2504 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2505                        const char *desc, uint32_t flag, bool set_flag)
2506 {
2507         struct ctdb_node_map *nodemap;
2508         bool flag_is_set;
2509
2510         nodemap = get_nodemap(ctdb, false);
2511         if (nodemap == NULL) {
2512                 return 1;
2513         }
2514
2515         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2516         if (set_flag == flag_is_set) {
2517                 if (set_flag) {
2518                         fprintf(stderr, "Node %u is already %s\n",
2519                                 ctdb->cmd_pnn, desc);
2520                 } else {
2521                         fprintf(stderr, "Node %u is not %s\n",
2522                                 ctdb->cmd_pnn, desc);
2523                 }
2524                 return 0;
2525         }
2526
2527         return 1;
2528 }
2529
2530 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2531                            uint32_t flag, bool set_flag)
2532 {
2533         struct ctdb_node_map *nodemap;
2534         bool flag_is_set;
2535
2536         while (1) {
2537                 nodemap = get_nodemap(ctdb, true);
2538                 if (nodemap == NULL) {
2539                         fprintf(stderr,
2540                                 "Failed to get nodemap, trying again\n");
2541                         sleep(1);
2542                         continue;
2543                 }
2544
2545                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2546                 if (flag_is_set == set_flag) {
2547                         break;
2548                 }
2549
2550                 sleep(1);
2551         }
2552 }
2553
2554 static int ctdb_ctrl_modflags(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2555                               struct ctdb_client_context *client,
2556                               uint32_t destnode, struct timeval timeout,
2557                               uint32_t set, uint32_t clear)
2558 {
2559         struct ctdb_node_map *nodemap;
2560         struct ctdb_node_flag_change flag_change;
2561         struct ctdb_req_control request;
2562         uint32_t *pnn_list;
2563         int ret, count;
2564
2565         ret = ctdb_ctrl_get_nodemap(mem_ctx, ev, client, destnode,
2566                                     tevent_timeval_zero(), &nodemap);
2567         if (ret != 0) {
2568                 return ret;
2569         }
2570
2571         flag_change.pnn = destnode;
2572         flag_change.old_flags = nodemap->node[destnode].flags;
2573         flag_change.new_flags = flag_change.old_flags | set;
2574         flag_change.new_flags &= ~clear;
2575
2576         count = list_of_connected_nodes(nodemap, -1, mem_ctx, &pnn_list);
2577         if (count == -1) {
2578                 return ENOMEM;
2579         }
2580
2581         ctdb_req_control_modify_flags(&request, &flag_change);
2582         ret = ctdb_client_control_multi(mem_ctx, ev, client, pnn_list, count,
2583                                         tevent_timeval_zero(), &request,
2584                                         NULL, NULL);
2585         return ret;
2586 }
2587
2588 struct ipreallocate_state {
2589         int status;
2590         bool done;
2591 };
2592
2593 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2594                                  void *private_data)
2595 {
2596         struct ipreallocate_state *state =
2597                 (struct ipreallocate_state *)private_data;
2598
2599         if (data.dsize != sizeof(int)) {
2600                 /* Ignore packet */
2601                 return;
2602         }
2603
2604         state->status = *(int *)data.dptr;
2605         state->done = true;
2606 }
2607
2608 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2609 {
2610         struct ctdb_srvid_message msg = { 0 };
2611         struct ipreallocate_state state;
2612         int ret;
2613
2614         msg.pnn = ctdb->pnn;
2615         msg.srvid = next_srvid(ctdb);
2616
2617         state.done = false;
2618         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2619                                               msg.srvid,
2620                                               ipreallocate_handler, &state);
2621         if (ret != 0) {
2622                 return ret;
2623         }
2624
2625         while (true) {
2626                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2627                                                 ctdb->client,
2628                                                 CTDB_BROADCAST_CONNECTED,
2629                                                 &msg);
2630                 if (ret != 0) {
2631                         goto fail;
2632                 }
2633
2634                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2635                                                TIMEOUT());
2636                 if (ret != 0) {
2637                         continue;
2638                 }
2639
2640                 if (state.status >= 0) {
2641                         ret = 0;
2642                 } else {
2643                         ret = state.status;
2644                 }
2645                 break;
2646         }
2647
2648 fail:
2649         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2650                                            msg.srvid, &state);
2651         return ret;
2652 }
2653
2654 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2655                            int argc, const char **argv)
2656 {
2657         int ret;
2658
2659         if (argc != 0) {
2660                 usage("disable");
2661         }
2662
2663         ret = check_flags(mem_ctx, ctdb, "disabled",
2664                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2665         if (ret == 0) {
2666                 return 0;
2667         }
2668
2669         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2670                                  ctdb->cmd_pnn, TIMEOUT(),
2671                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2672         if (ret != 0) {
2673                 fprintf(stderr,
2674                         "Failed to set DISABLED flag on node %u\n",
2675                         ctdb->cmd_pnn);
2676                 return ret;
2677         }
2678
2679         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2680         return ipreallocate(mem_ctx, ctdb);
2681 }
2682
2683 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2684                           int argc, const char **argv)
2685 {
2686         int ret;
2687
2688         if (argc != 0) {
2689                 usage("enable");
2690         }
2691
2692         ret = check_flags(mem_ctx, ctdb, "disabled",
2693                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2694         if (ret == 0) {
2695                 return 0;
2696         }
2697
2698         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2699                                  ctdb->cmd_pnn, TIMEOUT(),
2700                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2701         if (ret != 0) {
2702                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2703                         ctdb->cmd_pnn);
2704                 return ret;
2705         }
2706
2707         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2708         return ipreallocate(mem_ctx, ctdb);
2709 }
2710
2711 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2712                         int argc, const char **argv)
2713 {
2714         int ret;
2715
2716         if (argc != 0) {
2717                 usage("stop");
2718         }
2719
2720         ret = check_flags(mem_ctx, ctdb, "stopped",
2721                           NODE_FLAGS_STOPPED, true);
2722         if (ret == 0) {
2723                 return 0;
2724         }
2725
2726         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2727                                   ctdb->cmd_pnn, TIMEOUT());
2728         if (ret != 0) {
2729                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2730                 return ret;
2731         }
2732
2733         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2734         return ipreallocate(mem_ctx, ctdb);
2735 }
2736
2737 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2738                             int argc, const char **argv)
2739 {
2740         int ret;
2741
2742         if (argc != 0) {
2743                 usage("continue");
2744         }
2745
2746         ret = check_flags(mem_ctx, ctdb, "stopped",
2747                           NODE_FLAGS_STOPPED, false);
2748         if (ret == 0) {
2749                 return 0;
2750         }
2751
2752         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2753                                       ctdb->cmd_pnn, TIMEOUT());
2754         if (ret != 0) {
2755                 fprintf(stderr, "Failed to continue stopped node %u\n",
2756                         ctdb->cmd_pnn);
2757                 return ret;
2758         }
2759
2760         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2761         return ipreallocate(mem_ctx, ctdb);
2762 }
2763
2764 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2765                        int argc, const char **argv)
2766 {
2767         struct ctdb_ban_state ban_state;
2768         int ret;
2769
2770         if (argc != 1) {
2771                 usage("ban");
2772         }
2773
2774         ret = check_flags(mem_ctx, ctdb, "banned",
2775                           NODE_FLAGS_BANNED, true);
2776         if (ret == 0) {
2777                 return 0;
2778         }
2779
2780         ban_state.pnn = ctdb->cmd_pnn;
2781         ban_state.time = strtoul(argv[0], NULL, 0);
2782
2783         if (ban_state.time == 0) {
2784                 fprintf(stderr, "Ban time cannot be zero\n");
2785                 return EINVAL;
2786         }
2787
2788         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2789                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2790         if (ret != 0) {
2791                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2792                 return ret;
2793         }
2794
2795         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2796         return ipreallocate(mem_ctx, ctdb);
2797
2798 }
2799
2800 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2801                          int argc, const char **argv)
2802 {
2803         struct ctdb_ban_state ban_state;
2804         int ret;
2805
2806         if (argc != 0) {
2807                 usage("unban");
2808         }
2809
2810         ret = check_flags(mem_ctx, ctdb, "banned",
2811                           NODE_FLAGS_BANNED, false);
2812         if (ret == 0) {
2813                 return 0;
2814         }
2815
2816         ban_state.pnn = ctdb->cmd_pnn;
2817         ban_state.time = 0;
2818
2819         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2820                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2821         if (ret != 0) {
2822                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2823                 return ret;
2824         }
2825
2826         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2827         return ipreallocate(mem_ctx, ctdb);
2828
2829 }
2830
2831 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2832                             int argc, const char **argv)
2833 {
2834         int ret;
2835
2836         if (argc != 0) {
2837                 usage("shutdown");
2838         }
2839
2840         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2841                                  ctdb->cmd_pnn, TIMEOUT());
2842         if (ret != 0) {
2843                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2844                 return ret;
2845         }
2846
2847         return 0;
2848 }
2849
2850 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2851                           uint32_t *generation)
2852 {
2853         uint32_t recmaster;
2854         int recmode;
2855         struct ctdb_vnn_map *vnnmap;
2856         int ret;
2857
2858 again:
2859         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2860                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2861         if (ret != 0) {
2862                 fprintf(stderr, "Failed to find recovery master\n");
2863                 return ret;
2864         }
2865
2866         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2867                                     recmaster, TIMEOUT(), &recmode);
2868         if (ret != 0) {
2869                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2870                         recmaster);
2871                 return ret;
2872         }
2873
2874         if (recmode == CTDB_RECOVERY_ACTIVE) {
2875                 sleep(1);
2876                 goto again;
2877         }
2878
2879         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2880                                   recmaster, TIMEOUT(), &vnnmap);
2881         if (ret != 0) {
2882                 fprintf(stderr, "Failed to get generation from node %u\n",
2883                         recmaster);
2884                 return ret;
2885         }
2886
2887         if (vnnmap->generation == INVALID_GENERATION) {
2888                 talloc_free(vnnmap);
2889                 sleep(1);
2890                 goto again;
2891         }
2892
2893         *generation = vnnmap->generation;
2894         talloc_free(vnnmap);
2895         return 0;
2896 }
2897
2898
2899 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2900                            int argc, const char **argv)
2901 {
2902         uint32_t generation, next_generation;
2903         int ret;
2904
2905         if (argc != 0) {
2906                 usage("recover");
2907         }
2908
2909         ret = get_generation(mem_ctx, ctdb, &generation);
2910         if (ret != 0) {
2911                 return ret;
2912         }
2913
2914         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2915                                     ctdb->cmd_pnn, TIMEOUT(),
2916                                     CTDB_RECOVERY_ACTIVE);
2917         if (ret != 0) {
2918                 fprintf(stderr, "Failed to set recovery mode active\n");
2919                 return ret;
2920         }
2921
2922         while (1) {
2923                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2924                 if (ret != 0) {
2925                         fprintf(stderr,
2926                                 "Failed to confirm end of recovery\n");
2927                         return ret;
2928                 }
2929
2930                 if (next_generation != generation) {
2931                         break;
2932                 }
2933
2934                 sleep (1);
2935         }
2936
2937         return 0;
2938 }
2939
2940 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2941                                 int argc, const char **argv)
2942 {
2943         if (argc != 0) {
2944                 usage("ipreallocate");
2945         }
2946
2947         return ipreallocate(mem_ctx, ctdb);
2948 }
2949
2950 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2951                                   struct ctdb_context *ctdb,
2952                                   int argc, const char **argv)
2953 {
2954         uint32_t recmaster;
2955         int ret;
2956
2957         if (argc != 0) {
2958                 usage("isnotrecmaster");
2959         }
2960
2961         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2962                                       ctdb->pnn, TIMEOUT(), &recmaster);
2963         if (ret != 0) {
2964                 fprintf(stderr, "Failed to get recmaster\n");
2965                 return ret;
2966         }
2967
2968         if (recmaster != ctdb->pnn) {
2969                 printf("this node is not the recmaster\n");
2970                 return 1;
2971         }
2972
2973         printf("this node is the recmaster\n");
2974         return 0;
2975 }
2976
2977 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2978                            int argc, const char **argv)
2979 {
2980         struct ctdb_addr_info addr_info;
2981         int ret;
2982
2983         if (argc != 2) {
2984                 usage("gratarp");
2985         }
2986
2987         if (! parse_ip(argv[0], NULL, 0, &addr_info.addr)) {
2988                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2989                 return 1;
2990         }
2991         addr_info.iface = argv[1];
2992
2993         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
2994                                             ctdb->cmd_pnn, TIMEOUT(),
2995                                             &addr_info);
2996         if (ret != 0) {
2997                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
2998                         ctdb->cmd_pnn);
2999                 return ret;
3000         }
3001
3002         return 0;
3003 }
3004
3005 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3006                            int argc, const char **argv)
3007 {
3008         ctdb_sock_addr src, dst;
3009         int ret;
3010
3011         if (argc != 0 && argc != 2) {
3012                 usage("tickle");
3013         }
3014
3015         if (argc == 0) {
3016                 struct ctdb_connection_list *clist;
3017                 int i, num_failed;
3018
3019                 /* Client first but the src/dst logic is confused */
3020                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3021                 if (ret != 0) {
3022                         return ret;
3023                 }
3024
3025                 num_failed = 0;
3026                 for (i = 0; i < clist->num; i++) {
3027                         ret = ctdb_sys_send_tcp(&clist->conn[i].src,
3028                                                 &clist->conn[i].dst,
3029                                                 0, 0, 0);
3030                         if (ret != 0) {
3031                                 num_failed += 1;
3032                         }
3033                 }
3034
3035                 TALLOC_FREE(clist);
3036
3037                 if (num_failed > 0) {
3038                         fprintf(stderr, "Failed to send %d tickles\n",
3039                                 num_failed);
3040                         return 1;
3041                 }
3042
3043                 return 0;
3044         }
3045
3046
3047         if (! parse_ip_port(argv[0], &src)) {
3048                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3049                 return 1;
3050         }
3051
3052         if (! parse_ip_port(argv[1], &dst)) {
3053                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3054                 return 1;
3055         }
3056
3057         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3058         if (ret != 0) {
3059                 fprintf(stderr, "Failed to send tickle ack\n");
3060                 return ret;
3061         }
3062
3063         return 0;
3064 }
3065
3066 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3067                               int argc, const char **argv)
3068 {
3069         ctdb_sock_addr addr;
3070         struct ctdb_tickle_list *tickles;
3071         unsigned port = 0;
3072         int ret, i;
3073
3074         if (argc < 1 || argc > 2) {
3075                 usage("gettickles");
3076         }
3077
3078         if (argc == 2) {
3079                 port = strtoul(argv[1], NULL, 10);
3080         }
3081
3082         if (! parse_ip(argv[0], NULL, port, &addr)) {
3083                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3084                 return 1;
3085         }
3086
3087         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3088                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3089                                             &tickles);
3090         if (ret != 0) {
3091                 fprintf(stderr, "Failed to get list of connections\n");
3092                 return ret;
3093         }
3094
3095         if (options.machinereadable) {
3096                 printf("%s%s%s%s%s%s%s%s%s\n",
3097                        options.sep,
3098                        "Source IP", options.sep,
3099                        "Port", options.sep,
3100                        "Destiation IP", options.sep,
3101                        "Port", options.sep);
3102                 for (i=0; i<tickles->num; i++) {
3103                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3104                                ctdb_sock_addr_to_string(
3105                                        mem_ctx, &tickles->conn[i].src, false),
3106                                options.sep,
3107                                ntohs(tickles->conn[i].src.ip.sin_port),
3108                                options.sep,
3109                                ctdb_sock_addr_to_string(
3110                                        mem_ctx, &tickles->conn[i].dst, false),
3111                                options.sep,
3112                                ntohs(tickles->conn[i].dst.ip.sin_port),
3113                                options.sep);
3114                 }
3115         } else {
3116                 printf("Connections for IP: %s\n",
3117                        ctdb_sock_addr_to_string(mem_ctx,
3118                                                 &tickles->addr, false));
3119                 printf("Num connections: %u\n", tickles->num);
3120                 for (i=0; i<tickles->num; i++) {
3121                         printf("SRC: %s   DST: %s\n",
3122                                ctdb_sock_addr_to_string(
3123                                        mem_ctx, &tickles->conn[i].src, true),
3124                                ctdb_sock_addr_to_string(
3125                                        mem_ctx, &tickles->conn[i].dst, true));
3126                 }
3127         }
3128
3129         talloc_free(tickles);
3130         return 0;
3131 }
3132
3133 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3134                                    struct ctdb_connection *conn);
3135
3136 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3137
3138 struct process_clist_state {
3139         struct ctdb_connection_list *clist;
3140         int count;
3141         int num_failed, num_total;
3142         clist_reply_func reply_func;
3143 };
3144
3145 static void process_clist_done(struct tevent_req *subreq);
3146
3147 static struct tevent_req *process_clist_send(
3148                                         TALLOC_CTX *mem_ctx,
3149                                         struct ctdb_context *ctdb,
3150                                         struct ctdb_connection_list *clist,
3151                                         clist_request_func request_func,
3152                                         clist_reply_func reply_func)
3153 {
3154         struct tevent_req *req, *subreq;
3155         struct process_clist_state *state;
3156         struct ctdb_req_control request;
3157         int i;
3158
3159         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3160         if (req == NULL) {
3161                 return NULL;
3162         }
3163
3164         state->clist = clist;
3165         state->reply_func = reply_func;
3166
3167         for (i = 0; i < clist->num; i++) {
3168                 request_func(&request, &clist->conn[i]);
3169                 subreq = ctdb_client_control_send(state, ctdb->ev,
3170                                                   ctdb->client, ctdb->cmd_pnn,
3171                                                   TIMEOUT(), &request);
3172                 if (tevent_req_nomem(subreq, req)) {
3173                         return tevent_req_post(req, ctdb->ev);
3174                 }
3175                 tevent_req_set_callback(subreq, process_clist_done, req);
3176         }
3177
3178         return req;
3179 }
3180
3181 static void process_clist_done(struct tevent_req *subreq)
3182 {
3183         struct tevent_req *req = tevent_req_callback_data(
3184                 subreq, struct tevent_req);
3185         struct process_clist_state *state = tevent_req_data(
3186                 req, struct process_clist_state);
3187         struct ctdb_reply_control *reply;
3188         int ret;
3189         bool status;
3190
3191         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3192         TALLOC_FREE(subreq);
3193         if (! status) {
3194                 state->num_failed += 1;
3195                 goto done;
3196         }
3197
3198         ret = state->reply_func(reply);
3199         if (ret != 0) {
3200                 state->num_failed += 1;
3201                 goto done;
3202         }
3203
3204 done:
3205         state->num_total += 1;
3206         if (state->num_total == state->clist->num) {
3207                 tevent_req_done(req);
3208         }
3209 }
3210
3211 static int process_clist_recv(struct tevent_req *req)
3212 {
3213         struct process_clist_state *state = tevent_req_data(
3214                 req, struct process_clist_state);
3215
3216         return state->num_failed;
3217 }
3218
3219 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3220                              int argc, const char **argv)
3221 {
3222         struct ctdb_connection conn;
3223         int ret;
3224
3225         if (argc != 0 && argc != 2) {
3226                 usage("addtickle");
3227         }
3228
3229         if (argc == 0) {
3230                 struct ctdb_connection_list *clist;
3231                 struct tevent_req *req;
3232
3233                 /* Client first but the src/dst logic is confused */
3234                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3235                 if (ret != 0) {
3236                         return ret;
3237                 }
3238                 if (clist->num == 0) {
3239                         return 0;
3240                 }
3241
3242                 req = process_clist_send(mem_ctx, ctdb, clist,
3243                                  ctdb_req_control_tcp_add_delayed_update,
3244                                  ctdb_reply_control_tcp_add_delayed_update);
3245                 if (req == NULL) {
3246                         talloc_free(clist);
3247                         return ENOMEM;
3248                 }
3249
3250                 tevent_req_poll(req, ctdb->ev);
3251                 talloc_free(clist);
3252
3253                 ret = process_clist_recv(req);
3254                 if (ret != 0) {
3255                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3256                         return 1;
3257                 }
3258
3259                 return 0;
3260         }
3261
3262         if (! parse_ip_port(argv[0], &conn.src)) {
3263                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3264                 return 1;
3265         }
3266         if (! parse_ip_port(argv[1], &conn.dst)) {
3267                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3268                 return 1;
3269         }
3270
3271         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3272                                                ctdb->client, ctdb->cmd_pnn,
3273                                                TIMEOUT(), &conn);
3274         if (ret != 0) {
3275                 fprintf(stderr, "Failed to register connection\n");
3276                 return ret;
3277         }
3278
3279         return 0;
3280 }
3281
3282 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3283                              int argc, const char **argv)
3284 {
3285         struct ctdb_connection conn;
3286         int ret;
3287
3288         if (argc != 0 && argc != 2) {
3289                 usage("deltickle");
3290         }
3291
3292         if (argc == 0) {
3293                 struct ctdb_connection_list *clist;
3294                 struct tevent_req *req;
3295
3296                 /* Client first but the src/dst logic is confused */
3297                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3298                 if (ret != 0) {
3299                         return ret;
3300                 }
3301                 if (clist->num == 0) {
3302                         return 0;
3303                 }
3304
3305                 req = process_clist_send(mem_ctx, ctdb, clist,
3306                                          ctdb_req_control_tcp_remove,
3307                                          ctdb_reply_control_tcp_remove);
3308                 if (req == NULL) {
3309                         talloc_free(clist);
3310                         return ENOMEM;
3311                 }
3312
3313                 tevent_req_poll(req, ctdb->ev);
3314                 talloc_free(clist);
3315
3316                 ret = process_clist_recv(req);
3317                 if (ret != 0) {
3318                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3319                         return 1;
3320                 }
3321
3322                 return 0;
3323         }
3324
3325         if (! parse_ip_port(argv[0], &conn.src)) {
3326                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3327                 return 1;
3328         }
3329         if (! parse_ip_port(argv[1], &conn.dst)) {
3330                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3331                 return 1;
3332         }
3333
3334         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3335                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3336         if (ret != 0) {
3337                 fprintf(stderr, "Failed to unregister connection\n");
3338                 return ret;
3339         }
3340
3341         return 0;
3342 }
3343
3344 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3345                              int argc, const char **argv)
3346 {
3347         struct ctdb_node_map *nodemap;
3348         int i;
3349
3350         if (argc != 0) {
3351                 usage("listnodes");
3352         }
3353
3354         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3355         if (nodemap == NULL) {
3356                 return 1;
3357         }
3358
3359         for (i=0; i<nodemap->num; i++) {
3360                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3361                         continue;
3362                 }
3363
3364                 if (options.machinereadable) {
3365                         printf("%s%u%s%s%s\n", options.sep,
3366                                nodemap->node[i].pnn, options.sep,
3367                                ctdb_sock_addr_to_string(
3368                                        mem_ctx, &nodemap->node[i].addr, false),
3369                                options.sep);
3370                 } else {
3371                         printf("%s\n",
3372                                ctdb_sock_addr_to_string(
3373                                        mem_ctx, &nodemap->node[i].addr, false));
3374                 }
3375         }
3376
3377         return 0;
3378 }
3379
3380 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3381                               struct ctdb_node_map *nodemap2)
3382 {
3383         int i;
3384
3385         if (nodemap1->num != nodemap2->num) {
3386                 return false;
3387         }
3388
3389         for (i=0; i<nodemap1->num; i++) {
3390                 struct ctdb_node_and_flags *n1, *n2;
3391
3392                 n1 = &nodemap1->node[i];
3393                 n2 = &nodemap2->node[i];
3394
3395                 if ((n1->pnn != n2->pnn) ||
3396                     (n1->flags != n2->flags) ||
3397                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3398                         return false;
3399                 }
3400         }
3401
3402         return true;
3403 }
3404
3405 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3406                                    struct ctdb_node_map *nm,
3407                                    struct ctdb_node_map *fnm,
3408                                    bool *reload)
3409 {
3410         int i;
3411         bool check_failed = false;
3412
3413         *reload = false;
3414
3415         for (i=0; i<nm->num; i++) {
3416                 if (i >= fnm->num) {
3417                         fprintf(stderr,
3418                                 "Node %u (%s) missing from nodes file\n",
3419                                 nm->node[i].pnn,
3420                                 ctdb_sock_addr_to_string(
3421                                         mem_ctx, &nm->node[i].addr, false));
3422                         check_failed = true;
3423                         continue;
3424                 }
3425                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3426                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3427                         /* Node remains deleted */
3428                         continue;
3429                 }
3430
3431                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3432                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3433                         /* Node not newly nor previously deleted */
3434                         if (! ctdb_same_ip(&nm->node[i].addr,
3435                                            &fnm->node[i].addr)) {
3436                                 fprintf(stderr,
3437                                         "Node %u has changed IP address"
3438                                         " (was %s, now %s)\n",
3439                                         nm->node[i].pnn,
3440                                         ctdb_sock_addr_to_string(
3441                                                 mem_ctx,
3442                                                 &nm->node[i].addr, false),
3443                                         ctdb_sock_addr_to_string(
3444                                                 mem_ctx,
3445                                                 &fnm->node[i].addr, false));
3446                                 check_failed = true;
3447                         } else {
3448                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3449                                         fprintf(stderr,
3450                                                 "WARNING: Node %u is disconnected."
3451                                                 " You MUST fix this node manually!\n",
3452                                                 nm->node[i].pnn);
3453                                 }
3454                         }
3455                         continue;
3456                 }
3457
3458                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3459                         /* Node is being deleted */
3460                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3461                         *reload = true;
3462                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3463                                 fprintf(stderr,
3464                                         "ERROR: Node %u is still connected\n",
3465                                         nm->node[i].pnn);
3466                                 check_failed = true;
3467                         }
3468                         continue;
3469                 }
3470
3471                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3472                         /* Node was previously deleted */
3473                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3474                         *reload = true;
3475                 }
3476         }
3477
3478         if (check_failed) {
3479                 fprintf(stderr,
3480                         "ERROR: Nodes will not be reloaded due to previous error\n");
3481                 return 1;
3482         }
3483
3484         /* Leftover nodes in file are NEW */
3485         for (; i < fnm->num; i++) {
3486                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3487                 *reload = true;
3488         }
3489
3490         return 0;
3491 }
3492
3493 struct disable_recoveries_state {
3494         uint32_t *pnn_list;
3495         int node_count;
3496         bool *reply;
3497         int status;
3498         bool done;
3499 };
3500
3501 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3502                                        void *private_data)
3503 {
3504         struct disable_recoveries_state *state =
3505                 (struct disable_recoveries_state *)private_data;
3506         int ret, i;
3507
3508         if (data.dsize != sizeof(int)) {
3509                 /* Ignore packet */
3510                 return;
3511         }
3512
3513         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3514         ret = *(int *)data.dptr;
3515         if (ret < 0) {
3516                 state->status = ret;
3517                 state->done = true;
3518                 return;
3519         }
3520         for (i=0; i<state->node_count; i++) {
3521                 if (state->pnn_list[i] == ret) {
3522                         state->reply[i] = true;
3523                         break;
3524                 }
3525         }
3526
3527         state->done = true;
3528         for (i=0; i<state->node_count; i++) {
3529                 if (! state->reply[i]) {
3530                         state->done = false;
3531                         break;
3532                 }
3533         }
3534 }
3535
3536 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3537                               uint32_t timeout, uint32_t *pnn_list, int count)
3538 {
3539         struct ctdb_disable_message disable = { 0 };
3540         struct disable_recoveries_state state;
3541         int ret, i;
3542
3543         disable.pnn = ctdb->pnn;
3544         disable.srvid = next_srvid(ctdb);
3545         disable.timeout = timeout;
3546
3547         state.pnn_list = pnn_list;
3548         state.node_count = count;
3549         state.done = false;
3550         state.status = 0;
3551         state.reply = talloc_zero_array(mem_ctx, bool, count);
3552         if (state.reply == NULL) {
3553                 return ENOMEM;
3554         }
3555
3556         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3557                                               disable.srvid,
3558                                               disable_recoveries_handler,
3559                                               &state);
3560         if (ret != 0) {
3561                 return ret;
3562         }
3563
3564         for (i=0; i<count; i++) {
3565                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3566                                                       ctdb->client,
3567                                                       pnn_list[i],
3568                                                       &disable);
3569                 if (ret != 0) {
3570                         goto fail;
3571                 }
3572         }
3573
3574         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3575         if (ret == ETIME) {
3576                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3577         } else {
3578                 ret = (state.status >= 0 ? 0 : 1);
3579         }
3580
3581 fail:
3582         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3583                                            disable.srvid, &state);
3584         return ret;
3585 }
3586
3587 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3588                                int argc, const char **argv)
3589 {
3590         struct ctdb_node_map *nodemap = NULL;
3591         struct ctdb_node_map *file_nodemap;
3592         struct ctdb_node_map *remote_nodemap;
3593         struct ctdb_req_control request;
3594         struct ctdb_reply_control **reply;
3595         bool reload;
3596         int ret, i;
3597         uint32_t *pnn_list;
3598         int count;
3599
3600         nodemap = get_nodemap(ctdb, false);
3601         if (nodemap == NULL) {
3602                 return 1;
3603         }
3604
3605         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3606         if (file_nodemap == NULL) {
3607                 return 1;
3608         }
3609
3610         for (i=0; i<nodemap->num; i++) {
3611                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3612                         continue;
3613                 }
3614
3615                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3616                                                nodemap->node[i].pnn, TIMEOUT(),
3617                                                &remote_nodemap);
3618                 if (ret != 0) {
3619                         fprintf(stderr,
3620                                 "ERROR: Failed to get nodes file from node %u\n",
3621                                 nodemap->node[i].pnn);
3622                         return ret;
3623                 }
3624
3625                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3626                         fprintf(stderr,
3627                                 "ERROR: Nodes file on node %u differs"
3628                                  " from current node (%u)\n",
3629                                  nodemap->node[i].pnn, ctdb->pnn);
3630                         return 1;
3631                 }
3632         }
3633
3634         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3635         if (ret != 0) {
3636                 return ret;
3637         }
3638
3639         if (! reload) {
3640                 fprintf(stderr, "No change in nodes file,"
3641                                 " skipping unnecessary reload\n");
3642                 return 0;
3643         }
3644
3645         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3646                                         mem_ctx, &pnn_list);
3647         if (count <= 0) {
3648                 fprintf(stderr, "Memory allocation error\n");
3649                 return 1;
3650         }
3651
3652         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3653                                  pnn_list, count);
3654         if (ret != 0) {
3655                 fprintf(stderr, "Failed to disable recoveries\n");
3656                 return ret;
3657         }
3658
3659         ctdb_req_control_reload_nodes_file(&request);
3660         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3661                                         pnn_list, count, TIMEOUT(),
3662                                         &request, NULL, &reply);
3663         if (ret != 0) {
3664                 bool failed = false;
3665
3666                 for (i=0; i<count; i++) {
3667                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3668                         if (ret != 0) {
3669                                 fprintf(stderr,
3670                                         "Node %u failed to reload nodes\n",
3671                                         pnn_list[i]);
3672                                 failed = true;
3673                         }
3674                 }
3675                 if (failed) {
3676                         fprintf(stderr,
3677                                 "You MUST fix failed nodes manually!\n");
3678                 }
3679         }
3680
3681         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3682         if (ret != 0) {
3683                 fprintf(stderr, "Failed to enable recoveries\n");
3684                 return ret;
3685         }
3686
3687         return 0;
3688 }
3689
3690 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3691                   ctdb_sock_addr *addr, uint32_t pnn)
3692 {
3693         struct ctdb_public_ip_list *pubip_list;
3694         struct ctdb_public_ip pubip;
3695         struct ctdb_node_map *nodemap;
3696         struct ctdb_req_control request;
3697         uint32_t *pnn_list;
3698         int ret, i, count;
3699
3700         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3701                                             CTDB_BROADCAST_CONNECTED,
3702                                             2*options.timelimit);
3703         if (ret != 0) {
3704                 fprintf(stderr, "Failed to disable IP check\n");
3705                 return ret;
3706         }
3707
3708         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3709                                        pnn, TIMEOUT(), false, &pubip_list);
3710         if (ret != 0) {
3711                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3712                         pnn);
3713                 return ret;
3714         }
3715
3716         for (i=0; i<pubip_list->num; i++) {
3717                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3718                         break;
3719                 }
3720         }
3721
3722         if (i == pubip_list->num) {
3723                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3724                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr, false));
3725                 return 1;
3726         }
3727
3728         nodemap = get_nodemap(ctdb, false);
3729         if (nodemap == NULL) {
3730                 return 1;
3731         }
3732
3733         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3734         if (count <= 0) {
3735                 fprintf(stderr, "Memory allocation error\n");
3736                 return 1;
3737         }
3738
3739         pubip.pnn = pnn;
3740         pubip.addr = *addr;
3741         ctdb_req_control_release_ip(&request, &pubip);
3742
3743         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3744                                         pnn_list, count, TIMEOUT(),
3745                                         &request, NULL, NULL);
3746         if (ret != 0) {
3747                 fprintf(stderr, "Failed to release IP on nodes\n");
3748                 return ret;
3749         }
3750
3751         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3752                                     pnn, TIMEOUT(), &pubip);
3753         if (ret != 0) {
3754                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3755                 return ret;
3756         }
3757
3758         return 0;
3759 }
3760
3761 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3762                           int argc, const char **argv)
3763 {
3764         ctdb_sock_addr addr;
3765         uint32_t pnn;
3766         int ret, retries = 0;
3767
3768         if (argc != 2) {
3769                 usage("moveip");
3770         }
3771
3772         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3773                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3774                 return 1;
3775         }
3776
3777         pnn = strtoul(argv[1], NULL, 10);
3778         if (pnn == CTDB_UNKNOWN_PNN) {
3779                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3780                 return 1;
3781         }
3782
3783         while (retries < 5) {
3784                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3785                 if (ret == 0) {
3786                         break;
3787                 }
3788
3789                 sleep(3);
3790                 retries++;
3791         }
3792
3793         if (ret != 0) {
3794                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3795                         argv[0], pnn);
3796                 return ret;
3797         }
3798
3799         return 0;
3800 }
3801
3802 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3803                          uint32_t pnn)
3804 {
3805         int ret;
3806
3807         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3808                                           CTDB_BROADCAST_CONNECTED, pnn);
3809         if (ret != 0) {
3810                 fprintf(stderr,
3811                         "Failed to ask recovery master to distribute IPs\n");
3812                 return ret;
3813         }
3814
3815         return 0;
3816 }
3817
3818 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3819                          int argc, const char **argv)
3820 {
3821         ctdb_sock_addr addr;
3822         struct ctdb_public_ip_list *pubip_list;
3823         struct ctdb_addr_info addr_info;
3824         unsigned int mask;
3825         int ret, i, retries = 0;
3826
3827         if (argc != 2) {
3828                 usage("addip");
3829         }
3830
3831         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3832                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3833                 return 1;
3834         }
3835
3836         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3837                                        ctdb->cmd_pnn, TIMEOUT(),
3838                                        false, &pubip_list);
3839         if (ret != 0) {
3840                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3841                         ctdb->cmd_pnn);
3842                 return 1;
3843         }
3844
3845         for (i=0; i<pubip_list->num; i++) {
3846                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3847                         fprintf(stderr, "Node already knows about IP %s\n",
3848                                 ctdb_sock_addr_to_string(mem_ctx,
3849                                                          &addr, false));
3850                         return 0;
3851                 }
3852         }
3853
3854         addr_info.addr = addr;
3855         addr_info.mask = mask;
3856         addr_info.iface = argv[1];
3857
3858         while (retries < 5) {
3859                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3860                                               ctdb->cmd_pnn, TIMEOUT(),
3861                                               &addr_info);
3862                 if (ret == 0) {
3863                         break;
3864                 }
3865
3866                 sleep(3);
3867                 retries++;
3868         }
3869
3870         if (ret != 0) {
3871                 fprintf(stderr, "Failed to add public IP to node %u."
3872                                 " Giving up\n", ctdb->cmd_pnn);
3873                 return ret;
3874         }
3875
3876         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3877         if (ret != 0) {
3878                 return ret;
3879         }
3880
3881         return 0;
3882 }
3883
3884 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3885                          int argc, const char **argv)
3886 {
3887         ctdb_sock_addr addr;
3888         struct ctdb_public_ip_list *pubip_list;
3889         struct ctdb_addr_info addr_info;
3890         int ret, i;
3891
3892         if (argc != 1) {
3893                 usage("delip");
3894         }
3895
3896         if (! parse_ip(argv[0], NULL, 0, &addr)) {
3897                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3898                 return 1;
3899         }
3900
3901         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3902                                        ctdb->cmd_pnn, TIMEOUT(),
3903                                        false, &pubip_list);
3904         if (ret != 0) {
3905                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3906                         ctdb->cmd_pnn);
3907                 return 1;
3908         }
3909
3910         for (i=0; i<pubip_list->num; i++) {
3911                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3912                         break;
3913                 }
3914         }
3915
3916         if (i == pubip_list->num) {
3917                 fprintf(stderr, "Node does not know about IP address %s\n",
3918                         ctdb_sock_addr_to_string(mem_ctx, &addr, false));
3919                 return 0;
3920         }
3921
3922         addr_info.addr = addr;
3923         addr_info.mask = 0;
3924         addr_info.iface = NULL;
3925
3926         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3927                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3928         if (ret != 0) {
3929                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3930                         ctdb->cmd_pnn);
3931                 return ret;
3932         }
3933
3934         return 0;
3935 }
3936
3937 #define DB_VERSION      3
3938 #define MAX_DB_NAME     64
3939 #define MAX_REC_BUFFER_SIZE     (100*1000)
3940
3941 struct db_header {
3942         unsigned long version;
3943         time_t timestamp;
3944         unsigned long flags;
3945         unsigned long nbuf;
3946         unsigned long nrec;
3947         char name[MAX_DB_NAME];
3948 };
3949
3950 struct backup_state {
3951         TALLOC_CTX *mem_ctx;
3952         struct ctdb_rec_buffer *recbuf;
3953         uint32_t db_id;
3954         int fd;
3955         unsigned int nbuf, nrec;
3956 };
3957
3958 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
3959                           TDB_DATA key, TDB_DATA data, void *private_data)
3960 {
3961         struct backup_state *state = (struct backup_state *)private_data;
3962         size_t len;
3963         int ret;
3964
3965         if (state->recbuf == NULL) {
3966                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
3967                                                      state->db_id);
3968                 if (state->recbuf == NULL) {
3969                         return ENOMEM;
3970                 }
3971         }
3972
3973         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
3974                                   header, key, data);
3975         if (ret != 0) {
3976                 return ret;
3977         }
3978
3979         len = ctdb_rec_buffer_len(state->recbuf);
3980         if (len < MAX_REC_BUFFER_SIZE) {
3981                 return 0;
3982         }
3983
3984         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
3985         if (ret != 0) {
3986                 fprintf(stderr, "Failed to write records to backup file\n");
3987                 return ret;
3988         }
3989
3990         state->nbuf += 1;
3991         state->nrec += state->recbuf->count;
3992         TALLOC_FREE(state->recbuf);
3993
3994         return 0;
3995 }
3996
3997 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3998                             int argc, const char **argv)
3999 {
4000         const char *db_name;
4001         struct ctdb_db_context *db;
4002         uint32_t db_id;
4003         uint8_t db_flags;
4004         struct backup_state state;
4005         struct db_header db_hdr;
4006         int fd, ret;
4007
4008         if (argc != 2) {
4009                 usage("backupdb");
4010         }
4011
4012         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4013                 return 1;
4014         }
4015
4016         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4017                           db_flags, &db);
4018         if (ret != 0) {
4019                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4020                 return ret;
4021         }
4022
4023         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4024         if (fd == -1) {
4025                 ret = errno;
4026                 fprintf(stderr, "Failed to open file %s for writing\n",
4027                         argv[1]);
4028                 return ret;
4029         }
4030
4031         /* Write empty header first */
4032         ZERO_STRUCT(db_hdr);
4033         ret = write(fd, &db_hdr, sizeof(struct db_header));
4034         if (ret == -1) {
4035                 ret = errno;
4036                 close(fd);
4037                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4038                 return ret;
4039         }
4040
4041         state.mem_ctx = mem_ctx;
4042         state.recbuf = NULL;
4043         state.fd = fd;
4044         state.nbuf = 0;
4045         state.nrec = 0;
4046
4047         ret = ctdb_db_traverse_local(db, true, false, backup_handler, &state);
4048         if (ret != 0) {
4049                 fprintf(stderr, "Failed to collect records from DB %s\n",
4050                         db_name);
4051                 close(fd);
4052                 return ret;
4053         }
4054
4055         if (state.recbuf != NULL) {
4056                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4057                 if (ret != 0) {
4058                         fprintf(stderr,
4059                                 "Failed to write records to backup file\n");
4060                         close(fd);
4061                         return ret;
4062                 }
4063
4064                 state.nbuf += 1;
4065                 state.nrec += state.recbuf->count;
4066                 TALLOC_FREE(state.recbuf);
4067         }
4068
4069         db_hdr.version = DB_VERSION;
4070         db_hdr.timestamp = time(NULL);
4071         db_hdr.flags = db_flags;
4072         db_hdr.nbuf = state.nbuf;
4073         db_hdr.nrec = state.nrec;
4074         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4075
4076         lseek(fd, 0, SEEK_SET);
4077         ret = write(fd, &db_hdr, sizeof(struct db_header));
4078         if (ret == -1) {
4079                 ret = errno;
4080                 close(fd);
4081                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4082                 return ret;
4083         }
4084
4085         close(fd);
4086         printf("Database backed up to %s\n", argv[1]);
4087         return 0;
4088 }
4089
4090 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4091                              int argc, const char **argv)
4092 {
4093         const char *db_name = NULL;
4094         struct ctdb_db_context *db;
4095         struct db_header db_hdr;
4096         struct ctdb_node_map *nodemap;
4097         struct ctdb_req_control request;
4098         struct ctdb_reply_control **reply;
4099         struct ctdb_transdb wipedb;
4100         struct ctdb_pulldb_ext pulldb;
4101         struct ctdb_rec_buffer *recbuf;
4102         uint32_t generation;
4103         uint32_t *pnn_list;
4104         char timebuf[128];
4105         ssize_t n;
4106         int fd, i;
4107         int count, ret;
4108         uint8_t db_flags;
4109
4110         if (argc < 1 || argc > 2) {
4111                 usage("restoredb");
4112         }
4113
4114         fd = open(argv[0], O_RDONLY, 0600);
4115         if (fd == -1) {
4116                 ret = errno;
4117                 fprintf(stderr, "Failed to open file %s for reading\n",
4118                         argv[0]);
4119                 return ret;
4120         }
4121
4122         if (argc == 2) {
4123                 db_name = argv[1];
4124         }
4125
4126         n = read(fd, &db_hdr, sizeof(struct db_header));
4127         if (n == -1) {
4128                 ret = errno;
4129                 close(fd);
4130                 fprintf(stderr, "Failed to read db header from file %s\n",
4131                         argv[0]);
4132                 return ret;
4133         }
4134         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4135
4136         if (db_hdr.version != DB_VERSION) {
4137                 fprintf(stderr,
4138                         "Wrong version of backup file, expected %u, got %lu\n",
4139                         DB_VERSION, db_hdr.version);
4140                 close(fd);
4141                 return EINVAL;
4142         }
4143
4144         if (db_name == NULL) {
4145                 db_name = db_hdr.name;
4146         }
4147
4148         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4149                  localtime(&db_hdr.timestamp));
4150         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4151
4152         db_flags = db_hdr.flags & 0xff;
4153         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4154                           db_flags, &db);
4155         if (ret != 0) {
4156                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4157                 close(fd);
4158                 return ret;
4159         }
4160
4161         nodemap = get_nodemap(ctdb, false);
4162         if (nodemap == NULL) {
4163                 fprintf(stderr, "Failed to get nodemap\n");
4164                 close(fd);
4165                 return ENOMEM;
4166         }
4167
4168         ret = get_generation(mem_ctx, ctdb, &generation);
4169         if (ret != 0) {
4170                 fprintf(stderr, "Failed to get current generation\n");
4171                 close(fd);
4172                 return ret;
4173         }
4174
4175         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4176                                      &pnn_list);
4177         if (count <= 0) {
4178                 close(fd);
4179                 return ENOMEM;
4180         }
4181
4182         wipedb.db_id = ctdb_db_id(db);
4183         wipedb.tid = generation;
4184
4185         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4186         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4187                                         ctdb->client, pnn_list, count,
4188                                         TIMEOUT(), &request, NULL, NULL);
4189         if (ret != 0) {
4190                 goto failed;
4191         }
4192
4193
4194         ctdb_req_control_db_transaction_start(&request, &wipedb);
4195         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4196                                         pnn_list, count, TIMEOUT(),
4197                                         &request, NULL, NULL);
4198         if (ret != 0) {
4199                 goto failed;
4200         }
4201
4202         ctdb_req_control_wipe_database(&request, &wipedb);
4203         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4204                                         pnn_list, count, TIMEOUT(),
4205                                         &request, NULL, NULL);
4206         if (ret != 0) {
4207                 goto failed;
4208         }
4209
4210         pulldb.db_id = ctdb_db_id(db);
4211         pulldb.lmaster = 0;
4212         pulldb.srvid = SRVID_CTDB_PUSHDB;
4213
4214         ctdb_req_control_db_push_start(&request, &pulldb);
4215         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4216                                         pnn_list, count, TIMEOUT(),
4217                                         &request, NULL, NULL);
4218         if (ret != 0) {
4219                 goto failed;
4220         }
4221
4222         for (i=0; i<db_hdr.nbuf; i++) {
4223                 struct ctdb_req_message message;
4224                 TDB_DATA data;
4225                 size_t np;
4226
4227                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4228                 if (ret != 0) {
4229                         goto failed;
4230                 }
4231
4232                 data.dsize = ctdb_rec_buffer_len(recbuf);
4233                 data.dptr = talloc_size(mem_ctx, data.dsize);
4234                 if (data.dptr == NULL) {
4235                         goto failed;
4236                 }
4237
4238                 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
4239
4240                 message.srvid = pulldb.srvid;
4241                 message.data.data = data;
4242
4243                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4244                                                 ctdb->client,
4245                                                 pnn_list, count,
4246                                                 &message, NULL);
4247                 if (ret != 0) {
4248                         goto failed;
4249                 }
4250
4251                 talloc_free(recbuf);
4252                 talloc_free(data.dptr);
4253         }
4254
4255         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4256         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4257                                         pnn_list, count, TIMEOUT(),
4258                                         &request, NULL, &reply);
4259         if (ret != 0) {
4260                 goto failed;
4261         }
4262
4263         for (i=0; i<count; i++) {
4264                 uint32_t num_records;
4265
4266                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4267                                                          &num_records);
4268                 if (ret != 0) {
4269                         fprintf(stderr, "Invalid response from node %u\n",
4270                                 pnn_list[i]);
4271                         goto failed;
4272                 }
4273
4274                 if (num_records != db_hdr.nrec) {
4275                         fprintf(stderr, "Node %u received %u of %lu records\n",
4276                                 pnn_list[i], num_records, db_hdr.nrec);
4277                         goto failed;
4278                 }
4279         }
4280
4281         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4282         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4283                                         pnn_list, count, TIMEOUT(),
4284                                         &request, NULL, NULL);
4285         if (ret != 0) {
4286                 goto failed;
4287         }
4288
4289         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4290         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4291                                         pnn_list, count, TIMEOUT(),
4292                                         &request, NULL, NULL);
4293         if (ret != 0) {
4294                 goto failed;
4295         }
4296
4297         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4298         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4299                                         ctdb->client, pnn_list, count,
4300                                         TIMEOUT(), &request, NULL, NULL);
4301         if (ret != 0) {
4302                 goto failed;
4303         }
4304
4305         printf("Database %s restored\n", db_name);
4306         close(fd);
4307         return 0;
4308
4309
4310 failed:
4311         close(fd);
4312         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4313                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4314         return ret;
4315 }
4316
4317 struct dumpdbbackup_state {
4318         ctdb_rec_parser_func_t parser;
4319         struct dump_record_state sub_state;
4320 };
4321
4322 static int dumpdbbackup_handler(uint32_t reqid,
4323                                 struct ctdb_ltdb_header *header,
4324                                 TDB_DATA key, TDB_DATA data,
4325                                 void *private_data)
4326 {
4327         struct dumpdbbackup_state *state =
4328                 (struct dumpdbbackup_state *)private_data;
4329         struct ctdb_ltdb_header hdr;
4330         int ret;
4331
4332         ret = ctdb_ltdb_header_extract(&data, &hdr);
4333         if (ret != 0) {
4334                 return ret;
4335         }
4336
4337         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4338 }
4339
4340 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4341                                 int argc, const char **argv)
4342 {
4343         struct db_header db_hdr;
4344         char timebuf[128];
4345         struct dumpdbbackup_state state;
4346         ssize_t n;
4347         int fd, ret, i;
4348
4349         if (argc != 1) {
4350                 usage("dumpbackup");
4351         }
4352
4353         fd = open(argv[0], O_RDONLY, 0600);
4354         if (fd == -1) {
4355                 ret = errno;
4356                 fprintf(stderr, "Failed to open file %s for reading\n",
4357                         argv[0]);
4358                 return ret;
4359         }
4360
4361         n = read(fd, &db_hdr, sizeof(struct db_header));
4362         if (n == -1) {
4363                 ret = errno;
4364                 close(fd);
4365                 fprintf(stderr, "Failed to read db header from file %s\n",
4366                         argv[0]);
4367                 return ret;
4368         }
4369         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4370
4371         if (db_hdr.version != DB_VERSION) {
4372                 fprintf(stderr,
4373                         "Wrong version of backup file, expected %u, got %lu\n",
4374                         DB_VERSION, db_hdr.version);
4375                 close(fd);
4376                 return EINVAL;
4377         }
4378
4379         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4380                  localtime(&db_hdr.timestamp));
4381         printf("Dumping database %s from backup @ %s\n",
4382                db_hdr.name, timebuf);
4383
4384         state.parser = dump_record;
4385         state.sub_state.count = 0;
4386
4387         for (i=0; i<db_hdr.nbuf; i++) {
4388                 struct ctdb_rec_buffer *recbuf;
4389
4390                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4391                 if (ret != 0) {
4392                         fprintf(stderr, "Failed to read records\n");
4393                         close(fd);
4394                         return ret;
4395                 }
4396
4397                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4398                                                &state);
4399                 if (ret != 0) {
4400                         fprintf(stderr, "Failed to dump records\n");
4401                         close(fd);
4402                         return ret;
4403                 }
4404         }
4405
4406         close(fd);
4407         printf("Dumped %u record(s)\n", state.sub_state.count);
4408         return 0;
4409 }
4410
4411 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4412                           int argc, const char **argv)
4413 {
4414         const char *db_name;
4415         struct ctdb_db_context *db;
4416         uint32_t db_id;
4417         uint8_t db_flags;
4418         struct ctdb_node_map *nodemap;
4419         struct ctdb_req_control request;
4420         struct ctdb_transdb wipedb;
4421         uint32_t generation;
4422         uint32_t *pnn_list;
4423         int count, ret;
4424
4425         if (argc != 1) {
4426                 usage("wipedb");
4427         }
4428
4429         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4430                 return 1;
4431         }
4432
4433         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4434                           db_flags, &db);
4435         if (ret != 0) {
4436                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4437                 return ret;
4438         }
4439
4440         nodemap = get_nodemap(ctdb, false);
4441         if (nodemap == NULL) {
4442                 fprintf(stderr, "Failed to get nodemap\n");
4443                 return ENOMEM;
4444         }
4445
4446         ret = get_generation(mem_ctx, ctdb, &generation);
4447         if (ret != 0) {
4448                 fprintf(stderr, "Failed to get current generation\n");
4449                 return ret;
4450         }
4451
4452         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4453                                      &pnn_list);
4454         if (count <= 0) {
4455                 return ENOMEM;
4456         }
4457
4458         ctdb_req_control_db_freeze(&request, db_id);
4459         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4460                                         ctdb->client, pnn_list, count,
4461                                         TIMEOUT(), &request, NULL, NULL);
4462         if (ret != 0) {
4463                 goto failed;
4464         }
4465
4466         wipedb.db_id = db_id;
4467         wipedb.tid = generation;
4468
4469         ctdb_req_control_db_transaction_start(&request, &wipedb);
4470         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4471                                         pnn_list, count, TIMEOUT(),
4472                                         &request, NULL, NULL);
4473         if (ret != 0) {
4474                 goto failed;
4475         }
4476
4477         ctdb_req_control_wipe_database(&request, &wipedb);
4478         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4479                                         pnn_list, count, TIMEOUT(),
4480                                         &request, NULL, NULL);
4481         if (ret != 0) {
4482                 goto failed;
4483         }
4484
4485         ctdb_req_control_db_set_healthy(&request, db_id);
4486         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4487                                         pnn_list, count, TIMEOUT(),
4488                                         &request, NULL, NULL);
4489         if (ret != 0) {
4490                 goto failed;
4491         }
4492
4493         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4494         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4495                                         pnn_list, count, TIMEOUT(),
4496                                         &request, NULL, NULL);
4497         if (ret != 0) {
4498                 goto failed;
4499         }
4500
4501         ctdb_req_control_db_thaw(&request, db_id);
4502         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4503                                         ctdb->client, pnn_list, count,
4504                                         TIMEOUT(), &request, NULL, NULL);
4505         if (ret != 0) {
4506                 goto failed;
4507         }
4508
4509         printf("Database %s wiped\n", db_name);
4510         return 0;
4511
4512
4513 failed:
4514         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4515                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4516         return ret;
4517 }
4518
4519 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4520                              int argc, const char **argv)
4521 {
4522         uint32_t recmaster;
4523         int ret;
4524
4525         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4526                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4527         if (ret != 0) {
4528                 return ret;
4529         }
4530
4531         printf("%u\n", recmaster);
4532         return 0;
4533 }
4534
4535 static int control_event(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4536                          int argc, const char **argv)
4537 {
4538         char *t, *event_helper = NULL;
4539         char *eventd_socket = NULL;
4540         const char **new_argv;
4541         int i;
4542
4543         t = getenv("CTDB_EVENT_HELPER");
4544         if (t != NULL) {
4545                 event_helper = talloc_strdup(mem_ctx, t);
4546         } else {
4547                 event_helper = talloc_asprintf(mem_ctx, "%s/ctdb_event",
4548                                                CTDB_HELPER_BINDIR);
4549         }
4550
4551         if (event_helper == NULL) {
4552                 fprintf(stderr, "Unable to set event daemon helper\n");
4553                 return 1;
4554         }
4555
4556         t = getenv("CTDB_SOCKET");
4557         if (t != NULL) {
4558                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4559                                                 dirname(t));
4560         } else {
4561                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4562                                                 CTDB_RUNDIR);
4563         }
4564
4565         if (eventd_socket == NULL) {
4566                 fprintf(stderr, "Unable to set event daemon socket\n");
4567                 return 1;
4568         }
4569
4570         new_argv = talloc_array(mem_ctx, const char *, argc + 1);
4571         if (new_argv == NULL) {
4572                 fprintf(stderr, "Memory allocation error\n");
4573                 return 1;
4574         }
4575
4576         new_argv[0] = eventd_socket;
4577         for (i=0; i<argc; i++) {
4578                 new_argv[i+1] = argv[i];
4579         }
4580
4581         return run_helper(mem_ctx, "event daemon helper", event_helper,
4582                           argc+1, new_argv);
4583 }
4584
4585 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4586                                 int argc, const char **argv)
4587 {
4588         const char *new_argv[3];
4589
4590         if (argc > 1) {
4591                 usage("scriptstatus");
4592         }
4593
4594         new_argv[0] = "status";
4595         new_argv[1] = (argc == 0) ? "monitor" : argv[0];
4596         new_argv[2] = NULL;
4597
4598         (void) control_event(mem_ctx, ctdb, 2, new_argv);
4599         return 0;
4600 }
4601
4602 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4603                          int argc, const char **argv)
4604 {
4605         char *t, *natgw_helper = NULL;
4606
4607         if (argc != 1) {
4608                 usage("natgw");
4609         }
4610
4611         t = getenv("CTDB_NATGW_HELPER");
4612         if (t != NULL) {
4613                 natgw_helper = talloc_strdup(mem_ctx, t);
4614         } else {
4615                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4616                                                CTDB_HELPER_BINDIR);
4617         }
4618
4619         if (natgw_helper == NULL) {
4620                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4621                 return 1;
4622         }
4623
4624         return run_helper(mem_ctx, "NAT gateway helper", natgw_helper,
4625                           argc, argv);
4626 }
4627
4628 /*
4629  * Find the PNN of the current node
4630  * discover the pnn by loading the nodes file and try to bind
4631  * to all addresses one at a time until the ip address is found.
4632  */
4633 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4634 {
4635         struct ctdb_node_map *nodemap;
4636         int i;
4637
4638         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4639         if (nodemap == NULL) {
4640                 return false;
4641         }
4642
4643         for (i=0; i<nodemap->num; i++) {
4644                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4645                         continue;
4646                 }
4647                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4648                         if (pnn != NULL) {
4649                                 *pnn = nodemap->node[i].pnn;
4650                         }
4651                         talloc_free(nodemap);
4652                         return true;
4653                 }
4654         }
4655
4656         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4657         talloc_free(nodemap);
4658         return false;
4659 }
4660
4661 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4662                               int argc, const char **argv)
4663 {
4664         const char *reclock;
4665         int ret;
4666
4667         if (argc != 0) {
4668                 usage("getreclock");
4669         }
4670
4671         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4672                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4673         if (ret != 0) {
4674                 return ret;
4675         }
4676
4677         if (reclock != NULL) {
4678                 printf("%s\n", reclock);
4679         }
4680
4681         return 0;
4682 }
4683
4684 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4685                                   struct ctdb_context *ctdb,
4686                                   int argc, const char **argv)
4687 {
4688         uint32_t lmasterrole = 0;
4689         int ret;
4690
4691         if (argc != 1) {
4692                 usage("setlmasterrole");
4693         }
4694
4695         if (strcmp(argv[0], "on") == 0) {
4696                 lmasterrole = 1;
4697         } else if (strcmp(argv[0], "off") == 0) {
4698                 lmasterrole = 0;
4699         } else {
4700                 usage("setlmasterrole");
4701         }
4702
4703         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4704                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4705         if (ret != 0) {
4706                 return ret;
4707         }
4708
4709         return 0;
4710 }
4711
4712 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4713                                     struct ctdb_context *ctdb,
4714                                     int argc, const char **argv)
4715 {
4716         uint32_t recmasterrole = 0;
4717         int ret;
4718
4719         if (argc != 1) {
4720                 usage("setrecmasterrole");
4721         }
4722
4723         if (strcmp(argv[0], "on") == 0) {
4724                 recmasterrole = 1;
4725         } else if (strcmp(argv[0], "off") == 0) {
4726                 recmasterrole = 0;
4727         } else {
4728                 usage("setrecmasterrole");
4729         }
4730
4731         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4732                                           ctdb->cmd_pnn, TIMEOUT(),
4733                                           recmasterrole);
4734         if (ret != 0) {
4735                 return ret;
4736         }
4737
4738         return 0;
4739 }
4740
4741 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4742                                  struct ctdb_context *ctdb,
4743                                  int argc, const char **argv)
4744 {
4745         uint32_t db_id;
4746         uint8_t db_flags;
4747         int ret;
4748
4749         if (argc != 1) {
4750                 usage("setdbreadonly");
4751         }
4752
4753         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4754                 return 1;
4755         }
4756
4757         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4758                 fprintf(stderr, "READONLY can be set only on volatile DB\n");
4759                 return 1;
4760         }
4761
4762         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
4763                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
4764         if (ret != 0) {
4765                 return ret;
4766         }
4767
4768         return 0;
4769 }
4770
4771 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4772                                int argc, const char **argv)
4773 {
4774         uint32_t db_id;
4775         uint8_t db_flags;
4776         int ret;
4777
4778         if (argc != 1) {
4779                 usage("setdbsticky");
4780         }
4781
4782         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4783                 return 1;
4784         }
4785
4786         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4787                 fprintf(stderr, "STICKY can be set only on volatile DB\n");
4788                 return 1;
4789         }
4790
4791         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
4792                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
4793         if (ret != 0) {
4794                 return ret;
4795         }
4796
4797         return 0;
4798 }
4799
4800 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4801                           int argc, const char **argv)
4802 {
4803         const char *db_name;
4804         struct ctdb_db_context *db;
4805         struct ctdb_transaction_handle *h;
4806         uint8_t db_flags;
4807         TDB_DATA key, data;
4808         int ret;
4809
4810         if (argc < 2 || argc > 3) {
4811                 usage("pfetch");
4812         }
4813
4814         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4815                 return 1;
4816         }
4817
4818         if (! (db_flags &
4819                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4820                 fprintf(stderr, "Transactions not supported on DB %s\n",
4821                         db_name);
4822                 return 1;
4823         }
4824
4825         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4826                           db_flags, &db);
4827         if (ret != 0) {
4828                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4829                 return ret;
4830         }
4831
4832         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4833         if (ret != 0) {
4834                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4835                 return ret;
4836         }
4837
4838         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4839                                      TIMEOUT(), db, true, &h);
4840         if (ret != 0) {
4841                 fprintf(stderr, "Failed to start transaction on db %s\n",
4842                         db_name);
4843                 return ret;
4844         }
4845
4846         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
4847         if (ret != 0) {
4848                 fprintf(stderr, "Failed to read record for key %s\n",
4849                         argv[1]);
4850                 ctdb_transaction_cancel(h);
4851                 return ret;
4852         }
4853
4854         printf("%.*s\n", (int)data.dsize, data.dptr);
4855
4856         ctdb_transaction_cancel(h);
4857         return 0;
4858 }
4859
4860 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4861                           int argc, const char **argv)
4862 {
4863         const char *db_name;
4864         struct ctdb_db_context *db;
4865         struct ctdb_transaction_handle *h;
4866         uint8_t db_flags;
4867         TDB_DATA key, data;
4868         int ret;
4869
4870         if (argc != 3) {
4871                 usage("pstore");
4872         }
4873
4874         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4875                 return 1;
4876         }
4877
4878         if (! (db_flags &
4879                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4880                 fprintf(stderr, "Transactions not supported on DB %s\n",
4881                         db_name);
4882                 return 1;
4883         }
4884
4885         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4886                           db_flags, &db);
4887         if (ret != 0) {
4888                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4889                 return ret;
4890         }
4891
4892         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4893         if (ret != 0) {
4894                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4895                 return ret;
4896         }
4897
4898         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
4899         if (ret != 0) {
4900                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
4901                 return ret;
4902         }
4903
4904         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4905                                      TIMEOUT(), db, false, &h);
4906         if (ret != 0) {
4907                 fprintf(stderr, "Failed to start transaction on db %s\n",
4908                         db_name);
4909                 return ret;
4910         }
4911
4912         ret = ctdb_transaction_store_record(h, key, data);
4913         if (ret != 0) {
4914                 fprintf(stderr, "Failed to store record for key %s\n",
4915                         argv[1]);
4916                 ctdb_transaction_cancel(h);
4917                 return ret;
4918         }
4919
4920         ret = ctdb_transaction_commit(h);
4921         if (ret != 0) {
4922                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4923                         db_name);
4924                 ctdb_transaction_cancel(h);
4925                 return ret;
4926         }
4927
4928         return 0;
4929 }
4930
4931 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4932                            int argc, const char **argv)
4933 {
4934         const char *db_name;
4935         struct ctdb_db_context *db;
4936         struct ctdb_transaction_handle *h;
4937         uint8_t db_flags;
4938         TDB_DATA key;
4939         int ret;
4940
4941         if (argc != 2) {
4942                 usage("pdelete");
4943         }
4944
4945         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4946                 return 1;
4947         }
4948
4949         if (! (db_flags &
4950                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4951                 fprintf(stderr, "Transactions not supported on DB %s\n",
4952                         db_name);
4953                 return 1;
4954         }
4955
4956         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4957                           db_flags, &db);
4958         if (ret != 0) {
4959                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4960                 return ret;
4961         }
4962
4963         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4964         if (ret != 0) {
4965                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4966                 return ret;
4967         }
4968
4969         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4970                                      TIMEOUT(), db, false, &h);
4971         if (ret != 0) {
4972                 fprintf(stderr, "Failed to start transaction on db %s\n",
4973                         db_name);
4974                 return ret;
4975         }
4976
4977         ret = ctdb_transaction_delete_record(h, key);
4978         if (ret != 0) {
4979                 fprintf(stderr, "Failed to delete record for key %s\n",
4980                         argv[1]);
4981                 ctdb_transaction_cancel(h);
4982                 return ret;
4983         }
4984
4985         ret = ctdb_transaction_commit(h);
4986         if (ret != 0) {
4987                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4988                         db_name);
4989                 ctdb_transaction_cancel(h);
4990                 return ret;
4991         }
4992
4993         return 0;
4994 }
4995
4996 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
4997 {
4998         const char *t;
4999         size_t n;
5000         int ret;
5001
5002         *data = tdb_null;
5003
5004         /* Skip whitespace */
5005         n = strspn(*ptr, " \t");
5006         t = *ptr + n;
5007
5008         if (t[0] == '"') {
5009                 /* Quoted ASCII string - no wide characters! */
5010                 t++;
5011                 n = strcspn(t, "\"");
5012                 if (t[n] == '"') {
5013                         if (n > 0) {
5014                                 ret = str_to_data(t, n, mem_ctx, data);
5015                                 if (ret != 0) {
5016                                         return ret;
5017                                 }
5018                         }
5019                         *ptr = t + n + 1;
5020                 } else {
5021                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5022                         return 1;
5023                 }
5024         } else {
5025                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5026                 return 1;
5027         }
5028
5029         return 0;
5030 }
5031
5032 #define MAX_LINE_SIZE   1024
5033
5034 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5035                                  TDB_DATA *key, TDB_DATA *value)
5036 {
5037         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5038         const char *ptr;
5039         int ret;
5040
5041         ptr = fgets(line, MAX_LINE_SIZE, file);
5042         if (ptr == NULL) {
5043                 return false;
5044         }
5045
5046         /* Get key */
5047         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5048         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5049                 /* Line Ignored but not EOF */
5050                 *key = tdb_null;
5051                 return true;
5052         }
5053
5054         /* Get value */
5055         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5056         if (ret != 0) {
5057                 /* Line Ignored but not EOF */
5058                 talloc_free(key->dptr);
5059                 *key = tdb_null;
5060                 return true;
5061         }
5062
5063         return true;
5064 }
5065
5066 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5067                           int argc, const char **argv)
5068 {
5069         const char *db_name;
5070         struct ctdb_db_context *db;
5071         struct ctdb_transaction_handle *h;
5072         uint8_t db_flags;
5073         FILE *file;
5074         TDB_DATA key, value;
5075         int ret;
5076
5077         if (argc < 1 || argc > 2) {
5078                 usage("ptrans");
5079         }
5080
5081         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5082                 return 1;
5083         }
5084
5085         if (! (db_flags &
5086                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
5087                 fprintf(stderr, "Transactions not supported on DB %s\n",
5088                         db_name);
5089                 return 1;
5090         }
5091
5092         if (argc == 2) {
5093                 file = fopen(argv[1], "r");
5094                 if (file == NULL) {
5095                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5096                         return 1;
5097                 }
5098         } else {
5099                 file = stdin;
5100         }
5101
5102         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5103                           db_flags, &db);
5104         if (ret != 0) {
5105                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5106                 goto done;
5107         }
5108
5109         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5110                                      TIMEOUT(), db, false, &h);
5111         if (ret != 0) {
5112                 fprintf(stderr, "Failed to start transaction on db %s\n",
5113                         db_name);
5114                 goto done;
5115         }
5116
5117         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5118                 if (key.dsize != 0) {
5119                         ret = ctdb_transaction_store_record(h, key, value);
5120                         if (ret != 0) {
5121                                 fprintf(stderr, "Failed to store record\n");
5122                                 ctdb_transaction_cancel(h);
5123                                 goto done;
5124                         }
5125                         talloc_free(key.dptr);
5126                         talloc_free(value.dptr);
5127                 }
5128         }
5129
5130         ret = ctdb_transaction_commit(h);
5131         if (ret != 0) {
5132                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5133                         db_name);
5134                 ctdb_transaction_cancel(h);
5135         }
5136
5137 done:
5138         if (file != stdin) {
5139                 fclose(file);
5140         }
5141         return ret;
5142 }
5143
5144 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5145                           int argc, const char **argv)
5146 {
5147         struct tdb_context *tdb;
5148         TDB_DATA key, data;
5149         struct ctdb_ltdb_header header;
5150         int ret;
5151
5152         if (argc < 2 || argc > 3) {
5153                 usage("tfetch");
5154         }
5155
5156         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5157         if (tdb == NULL) {
5158                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5159                 return 1;
5160         }
5161
5162         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5163         if (ret != 0) {
5164                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5165                 tdb_close(tdb);
5166                 return ret;
5167         }
5168
5169         data = tdb_fetch(tdb, key);
5170         if (data.dptr == NULL) {
5171                 fprintf(stderr, "No record for key %s\n", argv[1]);
5172                 tdb_close(tdb);
5173                 return 1;
5174         }
5175
5176         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5177                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5178                 tdb_close(tdb);
5179                 return 1;
5180         }
5181
5182         tdb_close(tdb);
5183
5184         if (argc == 3) {
5185                 int fd;
5186                 ssize_t nwritten;
5187
5188                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5189                 if (fd == -1) {
5190                         fprintf(stderr, "Failed to open output file %s\n",
5191                                 argv[2]);
5192                         goto fail;
5193                 }
5194
5195                 nwritten = sys_write(fd, data.dptr, data.dsize);
5196                 if (nwritten != data.dsize) {
5197                         fprintf(stderr, "Failed to write record to file\n");
5198                         close(fd);
5199                         goto fail;
5200                 }
5201
5202                 close(fd);
5203         }
5204
5205 fail:
5206         ret = ctdb_ltdb_header_extract(&data, &header);
5207         if (ret != 0) {
5208                 fprintf(stderr, "Failed to parse header from data\n");
5209                 return 1;
5210         }
5211
5212         dump_ltdb_header(&header);
5213         dump_tdb_data("data", data);
5214
5215         return 0;
5216 }
5217
5218 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5219                           int argc, const char **argv)
5220 {
5221         struct tdb_context *tdb;
5222         TDB_DATA key, data[2], value;
5223         struct ctdb_ltdb_header header;
5224         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5225         size_t np;
5226         int ret;
5227
5228         if (argc < 3 || argc > 5) {
5229                 usage("tstore");
5230         }
5231
5232         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5233         if (tdb == NULL) {
5234                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5235                 return 1;
5236         }
5237
5238         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5239         if (ret != 0) {
5240                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5241                 tdb_close(tdb);
5242                 return ret;
5243         }
5244
5245         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5246         if (ret != 0) {
5247                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5248                 tdb_close(tdb);
5249                 return ret;
5250         }
5251
5252         ZERO_STRUCT(header);
5253
5254         if (argc > 3) {
5255                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5256         }
5257         if (argc > 4) {
5258                 header.dmaster = (uint32_t)atol(argv[4]);
5259         }
5260         if (argc > 5) {
5261                 header.flags = (uint32_t)atol(argv[5]);
5262         }
5263
5264         ctdb_ltdb_header_push(&header, header_buf, &np);
5265
5266         data[0].dsize = np;
5267         data[0].dptr = header_buf;
5268
5269         data[1].dsize = value.dsize;
5270         data[1].dptr = value.dptr;
5271
5272         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5273         if (ret != 0) {
5274                 fprintf(stderr, "Failed to write record %s to file %s\n",
5275                         argv[1], argv[0]);
5276         }
5277
5278         tdb_close(tdb);
5279
5280         return ret;
5281 }
5282
5283 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5284                            int argc, const char **argv)
5285 {
5286         const char *db_name;
5287         struct ctdb_db_context *db;
5288         struct ctdb_record_handle *h;
5289         uint8_t db_flags;
5290         TDB_DATA key, data;
5291         bool readonly = false;
5292         int ret;
5293
5294         if (argc < 2 || argc > 3) {
5295                 usage("readkey");
5296         }
5297
5298         if (argc == 3) {
5299                 if (strcmp(argv[2], "readonly") == 0) {
5300                         readonly = true;
5301                 } else {
5302                         usage("readkey");
5303                 }
5304         }
5305
5306         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5307                 return 1;
5308         }
5309
5310         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5311                 fprintf(stderr, "DB %s is not a volatile database\n",
5312                         db_name);
5313                 return 1;
5314         }
5315
5316         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5317                           db_flags, &db);
5318         if (ret != 0) {
5319                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5320                 return ret;
5321         }
5322
5323         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5324         if (ret != 0) {
5325                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5326                 return ret;
5327         }
5328
5329         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5330                               db, key, readonly, &h, NULL, &data);
5331         if (ret != 0) {
5332                 fprintf(stderr, "Failed to read record for key %s\n",
5333                         argv[1]);
5334         } else {
5335                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5336                        (int)data.dsize, data.dptr);
5337         }
5338
5339         talloc_free(h);
5340         return ret;
5341 }
5342
5343 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5344                             int argc, const char **argv)
5345 {
5346         const char *db_name;
5347         struct ctdb_db_context *db;
5348         struct ctdb_record_handle *h;
5349         uint8_t db_flags;
5350         TDB_DATA key, data;
5351         int ret;
5352
5353         if (argc != 3) {
5354                 usage("writekey");
5355         }
5356
5357         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5358                 return 1;
5359         }
5360
5361         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5362                 fprintf(stderr, "DB %s is not a volatile database\n",
5363                         db_name);
5364                 return 1;
5365         }
5366
5367         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5368                           db_flags, &db);
5369         if (ret != 0) {
5370                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5371                 return ret;
5372         }
5373
5374         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5375         if (ret != 0) {
5376                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5377                 return ret;
5378         }
5379
5380         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5381         if (ret != 0) {
5382                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5383                 return ret;
5384         }
5385
5386         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5387                               db, key, false, &h, NULL, NULL);
5388         if (ret != 0) {
5389                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5390                 return ret;
5391         }
5392
5393         ret = ctdb_store_record(h, data);
5394         if (ret != 0) {
5395                 fprintf(stderr, "Failed to store record for key %s\n",
5396                         argv[1]);
5397         }
5398
5399         talloc_free(h);
5400         return ret;
5401 }
5402
5403 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5404                              int argc, const char **argv)
5405 {
5406         const char *db_name;
5407         struct ctdb_db_context *db;
5408         struct ctdb_record_handle *h;
5409         uint8_t db_flags;
5410         TDB_DATA key, data;
5411         int ret;
5412
5413         if (argc != 2) {
5414                 usage("deletekey");
5415         }
5416
5417         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5418                 return 1;
5419         }
5420
5421         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5422                 fprintf(stderr, "DB %s is not a volatile database\n",
5423                         db_name);
5424                 return 1;
5425         }
5426
5427         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5428                           db_flags, &db);
5429         if (ret != 0) {
5430                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5431                 return ret;
5432         }
5433
5434         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5435         if (ret != 0) {
5436                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5437                 return ret;
5438         }
5439
5440         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5441                               db, key, false, &h, NULL, &data);
5442         if (ret != 0) {
5443                 fprintf(stderr, "Failed to fetch record for key %s\n",
5444                         argv[1]);
5445                 return ret;
5446         }
5447
5448         ret = ctdb_delete_record(h);
5449         if (ret != 0) {
5450                 fprintf(stderr, "Failed to delete record for key %s\n",
5451                         argv[1]);
5452         }
5453
5454         talloc_free(h);
5455         return ret;
5456 }
5457
5458 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5459                                 int argc, const char **argv)
5460 {
5461         struct sockaddr_in sin;
5462         unsigned int port;
5463         int s, v;
5464         int ret;
5465
5466         if (argc != 1) {
5467                 usage("chktcpport");
5468         }
5469
5470         port = atoi(argv[0]);
5471
5472         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5473         if (s == -1) {
5474                 fprintf(stderr, "Failed to open local socket\n");
5475                 return errno;
5476         }
5477
5478         v = fcntl(s, F_GETFL, 0);
5479         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5480                 fprintf(stderr, "Unable to set socket non-blocking\n");
5481                 close(s);
5482                 return errno;
5483         }
5484
5485         bzero(&sin, sizeof(sin));
5486         sin.sin_family = AF_INET;
5487         sin.sin_port = htons(port);
5488         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5489         close(s);
5490         if (ret == -1) {
5491                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5492                 return errno;
5493         }
5494
5495         return 0;
5496 }
5497
5498 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5499                                int argc, const char **argv)
5500 {
5501         uint32_t db_id;
5502         const char *db_name;
5503         uint64_t seqnum;
5504         int ret;
5505
5506         if (argc != 1) {
5507                 usage("getdbseqnum");
5508         }
5509
5510         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5511                 return 1;
5512         }
5513
5514         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5515                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5516                                       &seqnum);
5517         if (ret != 0) {
5518                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5519                         db_name);
5520                 return ret;
5521         }
5522
5523         printf("0x%"PRIx64"\n", seqnum);
5524         return 0;
5525 }
5526
5527 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5528                               int argc, const char **argv)
5529 {
5530         const char *nodestring = NULL;
5531         struct ctdb_node_map *nodemap;
5532         int ret, i;
5533         bool print_hdr = false;
5534
5535         if (argc > 1) {
5536                 usage("nodestatus");
5537         }
5538
5539         if (argc == 1) {
5540                 nodestring = argv[0];
5541                 if (strcmp(nodestring, "all") == 0) {
5542                         print_hdr = true;
5543                 }
5544         }
5545
5546         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5547                 return 1;
5548         }
5549
5550         if (options.machinereadable) {
5551                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5552         } else {
5553                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, print_hdr);
5554         }
5555
5556         ret = 0;
5557         for (i=0; i<nodemap->num; i++) {
5558                 ret |= nodemap->node[i].flags;
5559         }
5560
5561         return ret;
5562 }
5563
5564 const struct {
5565         const char *name;
5566         uint32_t offset;
5567 } db_stats_fields[] = {
5568 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5569         DBSTATISTICS_FIELD(db_ro_delegations),
5570         DBSTATISTICS_FIELD(db_ro_revokes),
5571         DBSTATISTICS_FIELD(locks.num_calls),
5572         DBSTATISTICS_FIELD(locks.num_current),
5573         DBSTATISTICS_FIELD(locks.num_pending),
5574         DBSTATISTICS_FIELD(locks.num_failed),
5575         DBSTATISTICS_FIELD(db_ro_delegations),
5576 };
5577
5578 static void print_dbstatistics(const char *db_name,
5579                                struct ctdb_db_statistics *s)
5580 {
5581         int i;
5582         const char *prefix = NULL;
5583         int preflen = 0;
5584
5585         printf("DB Statistics %s\n", db_name);
5586
5587         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5588                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5589                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5590                         if (! prefix ||
5591                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5592                                 prefix = db_stats_fields[i].name;
5593                                 printf(" %*.*s\n", preflen-1, preflen-1,
5594                                        db_stats_fields[i].name);
5595                         }
5596                 } else {
5597                         preflen = 0;
5598                 }
5599                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5600                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5601                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5602         }
5603
5604         printf(" hop_count_buckets:");
5605         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5606                 printf(" %d", s->hop_count_bucket[i]);
5607         }
5608         printf("\n");
5609
5610         printf(" lock_buckets:");
5611         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5612                 printf(" %d", s->locks.buckets[i]);
5613         }
5614         printf("\n");
5615
5616         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5617                "locks_latency      MIN/AVG/MAX",
5618                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5619                s->locks.latency.max, s->locks.latency.num);
5620
5621         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5622                "vacuum_latency     MIN/AVG/MAX",
5623                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5624                s->vacuum.latency.max, s->vacuum.latency.num);
5625
5626         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5627         for (i=0; i<s->num_hot_keys; i++) {
5628                 int j;
5629                 printf("     Count:%d Key:", s->hot_keys[i].count);
5630                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5631                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5632                 }
5633                 printf("\n");
5634         }
5635 }
5636
5637 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5638                                 int argc, const char **argv)
5639 {
5640         uint32_t db_id;
5641         const char *db_name;
5642         struct ctdb_db_statistics *dbstats;
5643         int ret;
5644
5645         if (argc != 1) {
5646                 usage("dbstatistics");
5647         }
5648
5649         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5650                 return 1;
5651         }
5652
5653         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5654                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5655                                           &dbstats);
5656         if (ret != 0) {
5657                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5658                         db_name);
5659                 return ret;
5660         }
5661
5662         print_dbstatistics(db_name, dbstats);
5663         return 0;
5664 }
5665
5666 struct disable_takeover_runs_state {
5667         uint32_t *pnn_list;
5668         int node_count;
5669         bool *reply;
5670         int status;
5671         bool done;
5672 };
5673
5674 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5675                                          void *private_data)
5676 {
5677         struct disable_takeover_runs_state *state =
5678                 (struct disable_takeover_runs_state *)private_data;
5679         int ret, i;
5680
5681         if (data.dsize != sizeof(int)) {
5682                 /* Ignore packet */
5683                 return;
5684         }
5685
5686         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5687         ret = *(int *)data.dptr;
5688         if (ret < 0) {
5689                 state->status = ret;
5690                 state->done = true;
5691                 return;
5692         }
5693         for (i=0; i<state->node_count; i++) {
5694                 if (state->pnn_list[i] == ret) {
5695                         state->reply[i] = true;
5696                         break;
5697                 }
5698         }
5699
5700         state->done = true;
5701         for (i=0; i<state->node_count; i++) {
5702                 if (! state->reply[i]) {
5703                         state->done = false;
5704                         break;
5705                 }
5706         }
5707 }
5708
5709 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5710                                  struct ctdb_context *ctdb, uint32_t timeout,
5711                                  uint32_t *pnn_list, int count)
5712 {
5713         struct ctdb_disable_message disable = { 0 };
5714         struct disable_takeover_runs_state state;
5715         int ret, i;
5716
5717         disable.pnn = ctdb->pnn;
5718         disable.srvid = next_srvid(ctdb);
5719         disable.timeout = timeout;
5720
5721         state.pnn_list = pnn_list;
5722         state.node_count = count;
5723         state.done = false;
5724         state.status = 0;
5725         state.reply = talloc_zero_array(mem_ctx, bool, count);
5726         if (state.reply == NULL) {
5727                 return ENOMEM;
5728         }
5729
5730         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5731                                               disable.srvid,
5732                                               disable_takeover_run_handler,
5733                                               &state);
5734         if (ret != 0) {
5735                 return ret;
5736         }
5737
5738         for (i=0; i<count; i++) {
5739                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5740                                                          ctdb->client,
5741                                                          pnn_list[i],
5742                                                          &disable);
5743                 if (ret != 0) {
5744                         goto fail;
5745                 }
5746         }
5747
5748         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5749         if (ret == ETIME) {
5750                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5751         } else {
5752                 ret = (state.status >= 0 ? 0 : 1);
5753         }
5754
5755 fail:
5756         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5757                                            disable.srvid, &state);
5758         return ret;
5759 }
5760
5761 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5762                              int argc, const char **argv)
5763 {
5764         const char *nodestring = NULL;
5765         struct ctdb_node_map *nodemap, *nodemap2;
5766         struct ctdb_req_control request;
5767         uint32_t *pnn_list, *pnn_list2;
5768         int ret, count, count2;
5769
5770         if (argc > 1) {
5771                 usage("reloadips");
5772         }
5773
5774         if (argc == 1) {
5775                 nodestring = argv[0];
5776         }
5777
5778         nodemap = get_nodemap(ctdb, false);
5779         if (nodemap == NULL) {
5780                 return 1;
5781         }
5782
5783         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
5784                 return 1;
5785         }
5786
5787         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
5788                                         mem_ctx, &pnn_list);
5789         if (count <= 0) {
5790                 fprintf(stderr, "Memory allocation error\n");
5791                 return 1;
5792         }
5793
5794         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
5795                                       mem_ctx, &pnn_list2);
5796         if (count2 <= 0) {
5797                 fprintf(stderr, "Memory allocation error\n");
5798                 return 1;
5799         }
5800
5801         /* Disable takeover runs on all connected nodes.  A reply
5802          * indicating success is needed from each node so all nodes
5803          * will need to be active.
5804          *
5805          * A check could be added to not allow reloading of IPs when
5806          * there are disconnected nodes.  However, this should
5807          * probably be left up to the administrator.
5808          */
5809         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
5810                                     pnn_list, count);
5811         if (ret != 0) {
5812                 fprintf(stderr, "Failed to disable takeover runs\n");
5813                 return ret;
5814         }
5815
5816         /* Now tell all the desired nodes to reload their public IPs.
5817          * Keep trying this until it succeeds.  This assumes all
5818          * failures are transient, which might not be true...
5819          */
5820         ctdb_req_control_reload_public_ips(&request);
5821         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
5822                                         pnn_list2, count2, TIMEOUT(),
5823                                         &request, NULL, NULL);
5824         if (ret != 0) {
5825                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
5826         }
5827
5828         /* It isn't strictly necessary to wait until takeover runs are
5829          * re-enabled but doing so can't hurt.
5830          */
5831         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
5832         if (ret != 0) {
5833                 fprintf(stderr, "Failed to enable takeover runs\n");
5834                 return ret;
5835         }
5836
5837         return ipreallocate(mem_ctx, ctdb);
5838 }
5839
5840 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5841                            int argc, const char **argv)
5842 {
5843         ctdb_sock_addr addr;
5844         char *iface;
5845
5846         if (argc != 1) {
5847                 usage("ipiface");
5848         }
5849
5850         if (! parse_ip(argv[0], NULL, 0, &addr)) {
5851                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
5852                 return 1;
5853         }
5854
5855         iface = ctdb_sys_find_ifname(&addr);
5856         if (iface == NULL) {
5857                 fprintf(stderr, "Failed to find interface for IP %s\n",
5858                         argv[0]);
5859                 return 1;
5860         }
5861         free(iface);
5862
5863         return 0;
5864 }
5865
5866
5867 static const struct ctdb_cmd {
5868         const char *name;
5869         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
5870         bool without_daemon; /* can be run without daemon running ? */
5871         bool remote; /* can be run on remote nodes */
5872         const char *msg;
5873         const char *args;
5874 } ctdb_commands[] = {
5875         { "version", control_version, true, false,
5876                 "show version of ctdb", NULL },
5877         { "status", control_status, false, true,
5878                 "show node status", NULL },
5879         { "uptime", control_uptime, false, true,
5880                 "show node uptime", NULL },
5881         { "ping", control_ping, false, true,
5882                 "ping all nodes", NULL },
5883         { "runstate", control_runstate, false, true,
5884                 "get/check runstate of a node",
5885                 "[setup|first_recovery|startup|running]" },
5886         { "getvar", control_getvar, false, true,
5887                 "get a tunable variable", "<name>" },
5888         { "setvar", control_setvar, false, true,
5889                 "set a tunable variable", "<name> <value>" },
5890         { "listvars", control_listvars, false, true,
5891                 "list tunable variables", NULL },
5892         { "statistics", control_statistics, false, true,
5893                 "show ctdb statistics", NULL },
5894         { "statisticsreset", control_statistics_reset, false, true,
5895                 "reset ctdb statistics", NULL },
5896         { "stats", control_stats, false, true,
5897                 "show rolling statistics", "[count]" },
5898         { "ip", control_ip, false, true,
5899                 "show public ips", "[all]" },
5900         { "ipinfo", control_ipinfo, false, true,
5901                 "show public ip details", "<ip>" },
5902         { "ifaces", control_ifaces, false, true,
5903                 "show interfaces", NULL },
5904         { "setifacelink", control_setifacelink, false, true,
5905                 "set interface link status", "<iface> up|down" },
5906         { "process-exists", control_process_exists, false, true,
5907                 "check if a process exists on a node",  "<pid> [<srvid>]" },
5908         { "getdbmap", control_getdbmap, false, true,
5909                 "show attached databases", NULL },
5910         { "getdbstatus", control_getdbstatus, false, true,
5911                 "show database status", "<dbname|dbid>" },
5912         { "catdb", control_catdb, false, false,
5913                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
5914         { "cattdb", control_cattdb, false, false,
5915                 "dump local ctdb database", "<dbname|dbid>" },
5916         { "getcapabilities", control_getcapabilities, false, true,
5917                 "show node capabilities", NULL },
5918         { "pnn", control_pnn, false, false,
5919                 "show the pnn of the currnet node", NULL },
5920         { "lvs", control_lvs, false, false,
5921                 "show lvs configuration", "master|list|status" },
5922         { "setdebug", control_setdebug, false, true,
5923                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
5924         { "getdebug", control_getdebug, false, true,
5925                 "get debug level", NULL },
5926         { "attach", control_attach, false, false,
5927                 "attach a database", "<dbname> [persistent|replicated]" },
5928         { "detach", control_detach, false, false,
5929                 "detach database(s)", "<dbname|dbid> ..." },
5930         { "dumpmemory", control_dumpmemory, false, true,
5931                 "dump ctdbd memory map", NULL },
5932         { "rddumpmemory", control_rddumpmemory, false, true,
5933                 "dump recoverd memory map", NULL },
5934         { "getpid", control_getpid, false, true,
5935                 "get ctdbd process ID", NULL },
5936         { "disable", control_disable, false, true,
5937                 "disable a node", NULL },
5938         { "enable", control_enable, false, true,
5939                 "enable a node", NULL },
5940         { "stop", control_stop, false, true,
5941                 "stop a node", NULL },
5942         { "continue", control_continue, false, true,
5943                 "continue a stopped node", NULL },
5944         { "ban", control_ban, false, true,
5945                 "ban a node", "<bantime>"},
5946         { "unban", control_unban, false, true,
5947                 "unban a node", NULL },
5948         { "shutdown", control_shutdown, false, true,
5949                 "shutdown ctdb daemon", NULL },
5950         { "recover", control_recover, false, true,
5951                 "force recovery", NULL },
5952         { "sync", control_ipreallocate, false, true,
5953                 "run ip reallocation (deprecated)", NULL },
5954         { "ipreallocate", control_ipreallocate, false, true,
5955                 "run ip reallocation", NULL },
5956         { "isnotrecmaster", control_isnotrecmaster, false, false,
5957                 "check if local node is the recmaster", NULL },
5958         { "gratarp", control_gratarp, false, true,
5959                 "send a gratuitous arp", "<ip> <interface>" },
5960         { "tickle", control_tickle, true, false,
5961                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5962         { "gettickles", control_gettickles, false, true,
5963                 "get the list of tickles", "<ip> [<port>]" },
5964         { "addtickle", control_addtickle, false, true,
5965                 "add a tickle", "<ip>:<port> <ip>:<port>" },
5966         { "deltickle", control_deltickle, false, true,
5967                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
5968         { "listnodes", control_listnodes, true, true,
5969                 "list nodes in the cluster", NULL },
5970         { "reloadnodes", control_reloadnodes, false, false,
5971                 "reload the nodes file all nodes", NULL },
5972         { "moveip", control_moveip, false, false,
5973                 "move an ip address to another node", "<ip> <node>" },
5974         { "addip", control_addip, false, true,
5975                 "add an ip address to a node", "<ip/mask> <iface>" },
5976         { "delip", control_delip, false, true,
5977                 "delete an ip address from a node", "<ip>" },
5978         { "backupdb", control_backupdb, false, false,
5979                 "backup a database into a file", "<dbname|dbid> <file>" },
5980         { "restoredb", control_restoredb, false, false,
5981                 "restore a database from a file", "<file> [dbname]" },
5982         { "dumpdbbackup", control_dumpdbbackup, true, false,
5983                 "dump database from a backup file", "<file>" },
5984         { "wipedb", control_wipedb, false, false,
5985                 "wipe the contents of a database.", "<dbname|dbid>"},
5986         { "recmaster", control_recmaster, false, true,
5987                 "show the pnn for the recovery master", NULL },
5988         { "event", control_event, true, false,
5989                 "event and event script commands", NULL },
5990         { "scriptstatus", control_scriptstatus, true, false,
5991                 "show event script status",
5992                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
5993         { "natgw", control_natgw, false, false,
5994                 "show natgw configuration", "master|list|status" },
5995         { "getreclock", control_getreclock, false, true,
5996                 "get recovery lock file", NULL },
5997         { "setlmasterrole", control_setlmasterrole, false, true,
5998                 "set LMASTER role", "on|off" },
5999         { "setrecmasterrole", control_setrecmasterrole, false, true,
6000                 "set RECMASTER role", "on|off"},
6001         { "setdbreadonly", control_setdbreadonly, false, true,
6002                 "enable readonly records", "<dbname|dbid>" },
6003         { "setdbsticky", control_setdbsticky, false, true,
6004                 "enable sticky records", "<dbname|dbid>"},
6005         { "pfetch", control_pfetch, false, false,
6006                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6007         { "pstore", control_pstore, false, false,
6008                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6009         { "pdelete", control_pdelete, false, false,
6010                 "delete record from persistent database", "<dbname|dbid> <key>" },
6011         { "ptrans", control_ptrans, false, false,
6012                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6013         { "tfetch", control_tfetch, false, true,
6014                 "fetch a record", "<tdb-file> <key> [<file>]" },
6015         { "tstore", control_tstore, false, true,
6016                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6017         { "readkey", control_readkey, false, false,
6018                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6019         { "writekey", control_writekey, false, false,
6020                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6021         { "deletekey", control_deletekey, false, false,
6022                 "delete a database key", "<dbname|dbid> <key>" },
6023         { "checktcpport", control_checktcpport, true, false,
6024                 "check if a service is bound to a specific tcp port or not", "<port>" },
6025         { "getdbseqnum", control_getdbseqnum, false, false,
6026                 "get database sequence number", "<dbname|dbid>" },
6027         { "nodestatus", control_nodestatus, false, true,
6028                 "show and return node status", "[all|<pnn-list>]" },
6029         { "dbstatistics", control_dbstatistics, false, true,
6030                 "show database statistics", "<dbname|dbid>" },
6031         { "reloadips", control_reloadips, false, false,
6032                 "reload the public addresses file", "[all|<pnn-list>]" },
6033         { "ipiface", control_ipiface, true, false,
6034                 "Find the interface an ip address is hosted on", "<ip>" },
6035 };
6036
6037 static const struct ctdb_cmd *match_command(const char *command)
6038 {
6039         const struct ctdb_cmd *cmd;
6040         int i;
6041
6042         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6043                 cmd = &ctdb_commands[i];
6044                 if (strlen(command) == strlen(cmd->name) &&
6045                     strncmp(command, cmd->name, strlen(command)) == 0) {
6046                         return cmd;
6047                 }
6048         }
6049
6050         return NULL;
6051 }
6052
6053
6054 /**
6055  * Show usage message
6056  */
6057 static void usage_full(void)
6058 {
6059         int i;
6060
6061         poptPrintHelp(pc, stdout, 0);
6062         printf("\nCommands:\n");
6063         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6064                 printf("  %-15s %-27s  %s\n",
6065                        ctdb_commands[i].name,
6066                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6067                        ctdb_commands[i].msg);
6068         }
6069 }
6070
6071 static void usage(const char *command)
6072 {
6073         const struct ctdb_cmd *cmd;
6074
6075         if (command == NULL) {
6076                 usage_full();
6077                 exit(1);
6078         }
6079
6080         cmd = match_command(command);
6081         if (cmd == NULL) {
6082                 usage_full();
6083         } else {
6084                 poptPrintUsage(pc, stdout, 0);
6085                 printf("\nCommands:\n");
6086                 printf("  %-15s %-27s  %s\n",
6087                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6088         }
6089
6090         exit(1);
6091 }
6092
6093 struct poptOption cmdline_options[] = {
6094         POPT_AUTOHELP
6095         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6096                 "CTDB socket path", "filename" },
6097         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6098                 "debug level"},
6099         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6100                 "timelimit (in seconds)" },
6101         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6102                 "node specification - integer" },
6103         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6104                 "enable machine readable output", NULL },
6105         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6106                 "specify separator for machine readable output", "CHAR" },
6107         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6108                 "enable machine parsable output with separator |", NULL },
6109         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6110                 "enable verbose output", NULL },
6111         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6112                 "die if runtime exceeds this limit (in seconds)" },
6113         POPT_TABLEEND
6114 };
6115
6116 static int process_command(const struct ctdb_cmd *cmd, int argc,
6117                            const char **argv)
6118 {
6119         TALLOC_CTX *tmp_ctx;
6120         struct ctdb_context *ctdb;
6121         int ret;
6122         bool status;
6123         uint64_t srvid_offset;
6124
6125         tmp_ctx = talloc_new(NULL);
6126         if (tmp_ctx == NULL) {
6127                 fprintf(stderr, "Memory allocation error\n");
6128                 goto fail;
6129         }
6130
6131         if (cmd->without_daemon) {
6132                 if (options.pnn != -1) {
6133                         fprintf(stderr,
6134                                 "Cannot specify node for command %s\n",
6135                                 cmd->name);
6136                         goto fail;
6137                 }
6138
6139                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6140                 talloc_free(tmp_ctx);
6141                 return ret;
6142         }
6143
6144         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6145         if (ctdb == NULL) {
6146                 fprintf(stderr, "Memory allocation error\n");
6147                 goto fail;
6148         }
6149
6150         ctdb->ev = tevent_context_init(ctdb);
6151         if (ctdb->ev == NULL) {
6152                 fprintf(stderr, "Failed to initialize tevent\n");
6153                 goto fail;
6154         }
6155
6156         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6157         if (ret != 0) {
6158                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6159                         options.socket);
6160
6161                 if (!find_node_xpnn(ctdb, NULL)) {
6162                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6163                 }
6164                 goto fail;
6165         }
6166
6167         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6168         srvid_offset = getpid() & 0xFFFF;
6169         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6170
6171         if (options.pnn != -1) {
6172                 status = verify_pnn(ctdb, options.pnn);
6173                 if (! status) {
6174                         goto fail;
6175                 }
6176
6177                 ctdb->cmd_pnn = options.pnn;
6178         } else {
6179                 ctdb->cmd_pnn = ctdb->pnn;
6180         }
6181
6182         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6183                 fprintf(stderr, "Node cannot be specified for command %s\n",
6184                         cmd->name);
6185                 goto fail;
6186         }
6187
6188         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6189         talloc_free(tmp_ctx);
6190         return ret;
6191
6192 fail:
6193         talloc_free(tmp_ctx);
6194         return 1;
6195 }
6196
6197 static void signal_handler(int sig)
6198 {
6199         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6200 }
6201
6202 static void alarm_handler(int sig)
6203 {
6204         /* Kill any child processes */
6205         signal(SIGTERM, signal_handler);
6206         kill(0, SIGTERM);
6207
6208         _exit(1);
6209 }
6210
6211 int main(int argc, const char *argv[])
6212 {
6213         int opt;
6214         const char **extra_argv;
6215         int extra_argc;
6216         const struct ctdb_cmd *cmd;
6217         const char *ctdb_socket;
6218         int loglevel;
6219         int ret;
6220
6221         setlinebuf(stdout);
6222
6223         /* Set default options */
6224         options.socket = CTDB_SOCKET;
6225         options.debuglevelstr = NULL;
6226         options.timelimit = 10;
6227         options.sep = "|";
6228         options.maxruntime = 0;
6229         options.pnn = -1;
6230
6231         ctdb_socket = getenv("CTDB_SOCKET");
6232         if (ctdb_socket != NULL) {
6233                 options.socket = ctdb_socket;
6234         }
6235
6236         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6237                             POPT_CONTEXT_KEEP_FIRST);
6238         while ((opt = poptGetNextOpt(pc)) != -1) {
6239                 fprintf(stderr, "Invalid option %s: %s\n",
6240                         poptBadOption(pc, 0), poptStrerror(opt));
6241                 exit(1);
6242         }
6243
6244         if (options.maxruntime == 0) {
6245                 const char *ctdb_timeout;
6246
6247                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6248                 if (ctdb_timeout != NULL) {
6249                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6250                 } else {
6251                         options.maxruntime = 120;
6252                 }
6253         }
6254         if (options.maxruntime <= 120) {
6255                 /* default timeout is 120 seconds */
6256                 options.maxruntime = 120;
6257         }
6258
6259         if (options.machineparsable) {
6260                 options.machinereadable = 1;
6261         }
6262
6263         /* setup the remaining options for the commands */
6264         extra_argc = 0;
6265         extra_argv = poptGetArgs(pc);
6266         if (extra_argv) {
6267                 extra_argv++;
6268                 while (extra_argv[extra_argc]) extra_argc++;
6269         }
6270
6271         if (extra_argc < 1) {
6272                 usage(NULL);
6273         }
6274
6275         cmd = match_command(extra_argv[0]);
6276         if (cmd == NULL) {
6277                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6278                 exit(1);
6279         }
6280
6281         /* Enable logging */
6282         setup_logging("ctdb", DEBUG_STDERR);
6283         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6284                 DEBUGLEVEL = loglevel;
6285         } else {
6286                 DEBUGLEVEL = DEBUG_ERR;
6287         }
6288
6289         signal(SIGALRM, alarm_handler);
6290         alarm(options.maxruntime);
6291
6292         ret = process_command(cmd, extra_argc, extra_argv);
6293         if (ret == -1) {
6294                 ret = 1;
6295         }
6296
6297         (void)poptFreeContext(pc);
6298
6299         return ret;
6300 }