ctdb-tools: Change onnode to use ONNODE_SSH and ONNODE_SSH_OPTS
[samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25 #include "system/dir.h"
26
27 #include <ctype.h>
28 #include <popt.h>
29 #include <talloc.h>
30 #include <tevent.h>
31 #include <tdb.h>
32
33 #include "common/version.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/sys_rw.h"
37
38 #include "common/db_hash.h"
39 #include "common/logging.h"
40 #include "protocol/protocol.h"
41 #include "protocol/protocol_api.h"
42 #include "protocol/protocol_util.h"
43 #include "common/system.h"
44 #include "client/client.h"
45 #include "client/client_sync.h"
46
47 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
48
49 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
50 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
51
52 static struct {
53         const char *socket;
54         const char *debuglevelstr;
55         int timelimit;
56         int pnn;
57         int machinereadable;
58         const char *sep;
59         int machineparsable;
60         int verbose;
61         int maxruntime;
62         int printemptyrecords;
63         int printdatasize;
64         int printlmaster;
65         int printhash;
66         int printrecordflags;
67 } options;
68
69 static poptContext pc;
70
71 struct ctdb_context {
72         struct tevent_context *ev;
73         struct ctdb_client_context *client;
74         struct ctdb_node_map *nodemap;
75         uint32_t pnn, cmd_pnn;
76         uint64_t srvid;
77 };
78
79 static void usage(const char *command);
80
81 /*
82  * Utility Functions
83  */
84
85 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
86 {
87         return (tv2->tv_sec - tv->tv_sec) +
88                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
89 }
90
91 static struct ctdb_node_and_flags *get_node_by_pnn(
92                                         struct ctdb_node_map *nodemap,
93                                         uint32_t pnn)
94 {
95         int i;
96
97         for (i=0; i<nodemap->num; i++) {
98                 if (nodemap->node[i].pnn == pnn) {
99                         return &nodemap->node[i];
100                 }
101         }
102         return NULL;
103 }
104
105 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
106 {
107         static const struct {
108                 uint32_t flag;
109                 const char *name;
110         } flag_names[] = {
111                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
112                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
113                 { NODE_FLAGS_BANNED,                "BANNED" },
114                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
115                 { NODE_FLAGS_DELETED,               "DELETED" },
116                 { NODE_FLAGS_STOPPED,               "STOPPED" },
117                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
118         };
119         char *flags_str = NULL;
120         int i;
121
122         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
123                 if (flags & flag_names[i].flag) {
124                         if (flags_str == NULL) {
125                                 flags_str = talloc_asprintf(mem_ctx,
126                                                 "%s", flag_names[i].name);
127                         } else {
128                                 flags_str = talloc_asprintf_append(flags_str,
129                                                 "|%s", flag_names[i].name);
130                         }
131                         if (flags_str == NULL) {
132                                 return "OUT-OF-MEMORY";
133                         }
134                 }
135         }
136         if (flags_str == NULL) {
137                 return "OK";
138         }
139
140         return flags_str;
141 }
142
143 static uint64_t next_srvid(struct ctdb_context *ctdb)
144 {
145         ctdb->srvid += 1;
146         return ctdb->srvid;
147 }
148
149 /*
150  * Get consistent nodemap information.
151  *
152  * If nodemap is already cached, use that. If not get it.
153  * If the current node is BANNED, then get nodemap from "better" node.
154  */
155 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
156 {
157         TALLOC_CTX *tmp_ctx;
158         struct ctdb_node_map *nodemap;
159         struct ctdb_node_and_flags *node;
160         uint32_t current_node;
161         int ret;
162
163         if (force) {
164                 TALLOC_FREE(ctdb->nodemap);
165         }
166
167         if (ctdb->nodemap != NULL) {
168                 return ctdb->nodemap;
169         }
170
171         tmp_ctx = talloc_new(ctdb);
172         if (tmp_ctx == NULL) {
173                 return false;
174         }
175
176         current_node = ctdb->pnn;
177 again:
178         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
179                                     current_node, TIMEOUT(), &nodemap);
180         if (ret != 0) {
181                 fprintf(stderr, "Failed to get nodemap from node %u\n",
182                         current_node);
183                 goto failed;
184         }
185
186         node = get_node_by_pnn(nodemap, current_node);
187         if (node->flags & NODE_FLAGS_BANNED) {
188                 /* Pick next node */
189                 do {
190                         current_node = (current_node + 1) % nodemap->num;
191                         node = get_node_by_pnn(nodemap, current_node);
192                         if (! (node->flags &
193                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
194                                 break;
195                         }
196                 } while (current_node != ctdb->pnn);
197
198                 if (current_node == ctdb->pnn) {
199                         /* Tried all nodes in the cluster */
200                         fprintf(stderr, "Warning: All nodes are banned.\n");
201                         goto failed;
202                 }
203
204                 goto again;
205         }
206
207         ctdb->nodemap = talloc_steal(ctdb, nodemap);
208         return nodemap;
209
210 failed:
211         talloc_free(tmp_ctx);
212         return NULL;
213 }
214
215 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
216 {
217         struct ctdb_node_map *nodemap;
218         bool found;
219         int i;
220
221         if (pnn == -1) {
222                 return false;
223         }
224
225         nodemap = get_nodemap(ctdb, false);
226         if (nodemap == NULL) {
227                 return false;
228         }
229
230         found = false;
231         for (i=0; i<nodemap->num; i++) {
232                 if (nodemap->node[i].pnn == pnn) {
233                         found = true;
234                         break;
235                 }
236         }
237         if (! found) {
238                 fprintf(stderr, "Node %u does not exist\n", pnn);
239                 return false;
240         }
241
242         if (nodemap->node[i].flags &
243             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
244                 fprintf(stderr, "Node %u has status %s\n", pnn,
245                         pretty_print_flags(ctdb, nodemap->node[i].flags));
246                 return false;
247         }
248
249         return true;
250 }
251
252 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
253                                             struct ctdb_node_map *nodemap)
254 {
255         struct ctdb_node_map *nodemap2;
256
257         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
258         if (nodemap2 == NULL) {
259                 return NULL;
260         }
261
262         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
263                                       nodemap->num);
264         if (nodemap2->node == NULL) {
265                 talloc_free(nodemap2);
266                 return NULL;
267         }
268
269         return nodemap2;
270 }
271
272 /*
273  * Get the number and the list of matching nodes
274  *
275  *   nodestring :=  NULL | all | pnn,[pnn,...]
276  *
277  * If nodestring is NULL, use the current node.
278  */
279 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
280                              const char *nodestring,
281                              struct ctdb_node_map **out)
282 {
283         struct ctdb_node_map *nodemap, *nodemap2;
284         struct ctdb_node_and_flags *node;
285         int i;
286
287         nodemap = get_nodemap(ctdb, false);
288         if (nodemap == NULL) {
289                 return false;
290         }
291
292         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
293         if (nodemap2 == NULL) {
294                 return false;
295         }
296
297         if (nodestring == NULL) {
298                 for (i=0; i<nodemap->num; i++) {
299                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
300                                 nodemap2->node[0] = nodemap->node[i];
301                                 break;
302                         }
303                 }
304                 nodemap2->num = 1;
305
306                 goto done;
307         }
308
309         if (strcmp(nodestring, "all") == 0) {
310                 for (i=0; i<nodemap->num; i++) {
311                         nodemap2->node[i] = nodemap->node[i];
312                 }
313                 nodemap2->num = nodemap->num;
314
315                 goto done;
316         } else {
317                 char *ns, *tok;
318
319                 ns = talloc_strdup(mem_ctx, nodestring);
320                 if (ns == NULL) {
321                         return false;
322                 }
323
324                 tok = strtok(ns, ",");
325                 while (tok != NULL) {
326                         uint32_t pnn;
327                         char *endptr;
328
329                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
330                         if (pnn == 0 && tok == endptr) {
331                                 fprintf(stderr, "Invalid node %s\n", tok);
332                                         return false;
333                         }
334
335                         node = get_node_by_pnn(nodemap, pnn);
336                         if (node == NULL) {
337                                 fprintf(stderr, "Node %u does not exist\n",
338                                         pnn);
339                                 return false;
340                         }
341
342                         nodemap2->node[nodemap2->num] = *node;
343                         nodemap2->num += 1;
344
345                         tok = strtok(NULL, ",");
346                 }
347         }
348
349 done:
350         *out = nodemap2;
351         return true;
352 }
353
354 /* Compare IP address */
355 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
356 {
357         bool ret = false;
358
359         if (ip1->sa.sa_family != ip2->sa.sa_family) {
360                 return false;
361         }
362
363         switch (ip1->sa.sa_family) {
364         case AF_INET:
365                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
366                               sizeof(struct in_addr)) == 0);
367                 break;
368
369         case AF_INET6:
370                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
371                               sizeof(struct in6_addr)) == 0);
372                 break;
373         }
374
375         return ret;
376 }
377
378 /* Append a node to a node map with given address and flags */
379 static bool node_map_add(struct ctdb_node_map *nodemap,
380                          const char *nstr, uint32_t flags)
381 {
382         ctdb_sock_addr addr;
383         uint32_t num;
384         struct ctdb_node_and_flags *n;
385         int ret;
386
387         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
388         if (ret != 0) {
389                 fprintf(stderr, "Invalid IP address %s\n", nstr);
390                 return false;
391         }
392
393         num = nodemap->num;
394         nodemap->node = talloc_realloc(nodemap, nodemap->node,
395                                        struct ctdb_node_and_flags, num+1);
396         if (nodemap->node == NULL) {
397                 return false;
398         }
399
400         n = &nodemap->node[num];
401         n->addr = addr;
402         n->pnn = num;
403         n->flags = flags;
404
405         nodemap->num = num+1;
406         return true;
407 }
408
409 /* Read a nodes file into a node map */
410 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
411                                                   const char *nlist)
412 {
413         char **lines;
414         int nlines;
415         int i;
416         struct ctdb_node_map *nodemap;
417
418         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
419         if (nodemap == NULL) {
420                 return NULL;
421         }
422
423         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
424         if (lines == NULL) {
425                 return NULL;
426         }
427
428         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
429                 nlines--;
430         }
431
432         for (i=0; i<nlines; i++) {
433                 char *node;
434                 uint32_t flags;
435                 size_t len;
436
437                 node = lines[i];
438                 /* strip leading spaces */
439                 while((*node == ' ') || (*node == '\t')) {
440                         node++;
441                 }
442
443                 len = strlen(node);
444
445                 /* strip trailing spaces */
446                 while ((len > 1) &&
447                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
448                 {
449                         node[len-1] = '\0';
450                         len--;
451                 }
452
453                 if (len == 0) {
454                         continue;
455                 }
456                 if (*node == '#') {
457                         /* A "deleted" node is a node that is
458                            commented out in the nodes file.  This is
459                            used instead of removing a line, which
460                            would cause subsequent nodes to change
461                            their PNN. */
462                         flags = NODE_FLAGS_DELETED;
463                         node = discard_const("0.0.0.0");
464                 } else {
465                         flags = 0;
466                 }
467                 if (! node_map_add(nodemap, node, flags)) {
468                         talloc_free(lines);
469                         TALLOC_FREE(nodemap);
470                         return NULL;
471                 }
472         }
473
474         talloc_free(lines);
475         return nodemap;
476 }
477
478 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
479 {
480         struct ctdb_node_map *nodemap;
481         char *nodepath;
482         const char *nodes_list = NULL;
483
484         if (pnn != CTDB_UNKNOWN_PNN) {
485                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
486                 if (nodepath != NULL) {
487                         nodes_list = getenv(nodepath);
488                 }
489         }
490         if (nodes_list == NULL) {
491                 nodes_list = getenv("CTDB_NODES");
492         }
493         if (nodes_list == NULL) {
494                 const char *basedir = getenv("CTDB_BASE");
495                 if (basedir == NULL) {
496                         basedir = CTDB_ETCDIR;
497                 }
498                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
499                 if (nodes_list == NULL) {
500                         fprintf(stderr, "Memory allocation error\n");
501                         return NULL;
502                 }
503         }
504
505         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
506         if (nodemap == NULL) {
507                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
508                         nodes_list);
509                 return NULL;
510         }
511
512         return nodemap;
513 }
514
515 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
516                                  struct ctdb_context *ctdb,
517                                  struct ctdb_dbid_map *dbmap,
518                                  const char *db_name)
519 {
520         struct ctdb_dbid *db = NULL;
521         const char *name;
522         int ret, i;
523
524         for (i=0; i<dbmap->num; i++) {
525                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
526                                            ctdb->pnn, TIMEOUT(),
527                                            dbmap->dbs[i].db_id, &name);
528                 if (ret != 0) {
529                         return false;
530                 }
531
532                 if (strcmp(db_name, name) == 0) {
533                         talloc_free(discard_const(name));
534                         db = &dbmap->dbs[i];
535                         break;
536                 }
537         }
538
539         return db;
540 }
541
542 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
543                       const char *db_arg, uint32_t *db_id,
544                       const char **db_name, uint8_t *db_flags)
545 {
546         struct ctdb_dbid_map *dbmap;
547         struct ctdb_dbid *db = NULL;
548         uint32_t id = 0;
549         const char *name = NULL;
550         int ret, i;
551
552         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
553                                   ctdb->pnn, TIMEOUT(), &dbmap);
554         if (ret != 0) {
555                 return false;
556         }
557
558         if (strncmp(db_arg, "0x", 2) == 0) {
559                 id = strtoul(db_arg, NULL, 0);
560                 for (i=0; i<dbmap->num; i++) {
561                         if (id == dbmap->dbs[i].db_id) {
562                                 db = &dbmap->dbs[i];
563                                 break;
564                         }
565                 }
566         } else {
567                 name = db_arg;
568                 db = db_find(mem_ctx, ctdb, dbmap, name);
569         }
570
571         if (db == NULL) {
572                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
573                 return false;
574         }
575
576         if (name == NULL) {
577                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
578                                            ctdb->pnn, TIMEOUT(), id, &name);
579                 if (ret != 0) {
580                         return false;
581                 }
582         }
583
584         if (db_id != NULL) {
585                 *db_id = db->db_id;
586         }
587         if (db_name != NULL) {
588                 *db_name = talloc_strdup(mem_ctx, name);
589         }
590         if (db_flags != NULL) {
591                 *db_flags = db->flags;
592         }
593         return true;
594 }
595
596 static int h2i(char h)
597 {
598         if (h >= 'a' && h <= 'f') {
599                 return h - 'a' + 10;
600         }
601         if (h >= 'A' && h <= 'F') {
602                 return h - 'f' + 10;
603         }
604         return h - '0';
605 }
606
607 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
608                        TDB_DATA *out)
609 {
610         int i;
611         TDB_DATA data;
612
613         if (len & 0x01) {
614                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
615                         str);
616                 return EINVAL;
617         }
618
619         data.dsize = len / 2;
620         data.dptr = talloc_size(mem_ctx, data.dsize);
621         if (data.dptr == NULL) {
622                 return ENOMEM;
623         }
624
625         for (i=0; i<data.dsize; i++) {
626                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
627         }
628
629         *out = data;
630         return 0;
631 }
632
633 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
634                        TDB_DATA *out)
635 {
636         TDB_DATA data;
637         int ret = 0;
638
639         if (strncmp(str, "0x", 2) == 0) {
640                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
641         } else {
642                 data.dptr = talloc_memdup(mem_ctx, str, len);
643                 if (data.dptr == NULL) {
644                         return ENOMEM;
645                 }
646                 data.dsize = len;
647         }
648
649         *out = data;
650         return ret;
651 }
652
653 static int run_helper(TALLOC_CTX *mem_ctx, const char *command,
654                       const char *path, int argc, const char **argv)
655 {
656         pid_t pid;
657         int save_errno, status, ret;
658         const char **new_argv;
659         int i;
660
661         new_argv = talloc_array(mem_ctx, const char *, argc + 2);
662         if (new_argv == NULL) {
663                 return ENOMEM;
664         }
665
666         new_argv[0] = path;
667         for (i=0; i<argc; i++) {
668                 new_argv[i+1] = argv[i];
669         }
670         new_argv[argc+1] = NULL;
671
672         pid = fork();
673         if (pid < 0) {
674                 save_errno = errno;
675                 talloc_free(new_argv);
676                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
677                         command, path, strerror(save_errno));
678                 return save_errno;
679         }
680
681         if (pid == 0) {
682                 ret = execv(path, discard_const(new_argv));
683                 if (ret == -1) {
684                         _exit(64+errno);
685                 }
686                 /* Should not happen */
687                 _exit(64+ENOEXEC);
688         }
689
690         talloc_free(new_argv);
691
692         ret = waitpid(pid, &status, 0);
693         if (ret == -1) {
694                 save_errno = errno;
695                 fprintf(stderr, "waitpid() failed for %s - %s\n",
696                         command, strerror(save_errno));
697                 return save_errno;
698         }
699
700         if (WIFEXITED(status)) {
701                 int pstatus = WEXITSTATUS(status);
702                 if (WIFSIGNALED(status)) {
703                         fprintf(stderr, "%s terminated with signal %d\n",
704                                 command, WTERMSIG(status));
705                         ret = EINTR;
706                 } else if (pstatus >= 64 && pstatus < 255) {
707                         fprintf(stderr, "%s failed with error %d\n",
708                                 command, pstatus-64);
709                         ret = pstatus - 64;
710                 } else {
711                         ret = pstatus;
712                 }
713                 return ret;
714         } else if (WIFSIGNALED(status)) {
715                 fprintf(stderr, "%s terminated with signal %d\n",
716                         command, WTERMSIG(status));
717                 return EINTR;
718         }
719
720         return 0;
721 }
722
723 /*
724  * Command Functions
725  */
726
727 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
728                            int argc, const char **argv)
729 {
730         printf("%s\n", ctdb_version_string);
731         return 0;
732 }
733
734 static bool partially_online(TALLOC_CTX *mem_ctx,
735                              struct ctdb_context *ctdb,
736                              struct ctdb_node_and_flags *node)
737 {
738         struct ctdb_iface_list *iface_list;
739         int ret, i;
740         bool status = false;
741
742         if (node->flags != 0) {
743                 return false;
744         }
745
746         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
747                                    node->pnn, TIMEOUT(), &iface_list);
748         if (ret != 0) {
749                 return false;
750         }
751
752         status = false;
753         for (i=0; i < iface_list->num; i++) {
754                 if (iface_list->iface[i].link_state == 0) {
755                         status = true;
756                         break;
757                 }
758         }
759
760         return status;
761 }
762
763 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
764                                   struct ctdb_context *ctdb,
765                                   struct ctdb_node_map *nodemap,
766                                   uint32_t mypnn)
767 {
768         struct ctdb_node_and_flags *node;
769         int i;
770
771         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
772                options.sep,
773                "Node", options.sep,
774                "IP", options.sep,
775                "Disconnected", options.sep,
776                "Banned", options.sep,
777                "Disabled", options.sep,
778                "Unhealthy", options.sep,
779                "Stopped", options.sep,
780                "Inactive", options.sep,
781                "PartiallyOnline", options.sep,
782                "ThisNode", options.sep);
783
784         for (i=0; i<nodemap->num; i++) {
785                 node = &nodemap->node[i];
786                 if (node->flags & NODE_FLAGS_DELETED) {
787                         continue;
788                 }
789
790                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
791                        options.sep,
792                        node->pnn, options.sep,
793                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
794                        options.sep,
795                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
796                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
797                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
798                        options.sep,
799                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
800                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
801                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
802                        partially_online(mem_ctx, ctdb, node), options.sep,
803                        (node->pnn == mypnn)?'Y':'N', options.sep);
804         }
805
806 }
807
808 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
809                           struct ctdb_node_map *nodemap, uint32_t mypnn,
810                           bool print_header)
811 {
812         struct ctdb_node_and_flags *node;
813         int num_deleted_nodes = 0;
814         int i;
815
816         for (i=0; i<nodemap->num; i++) {
817                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
818                         num_deleted_nodes++;
819                 }
820         }
821
822         if (print_header) {
823                 if (num_deleted_nodes == 0) {
824                         printf("Number of nodes:%d\n", nodemap->num);
825                 } else {
826                         printf("Number of nodes:%d "
827                                "(including %d deleted nodes)\n",
828                                nodemap->num, num_deleted_nodes);
829                 }
830         }
831
832         for (i=0; i<nodemap->num; i++) {
833                 node = &nodemap->node[i];
834                 if (node->flags & NODE_FLAGS_DELETED) {
835                         continue;
836                 }
837
838                 printf("pnn:%u %-16s %s%s\n",
839                        node->pnn,
840                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
841                        partially_online(mem_ctx, ctdb, node) ?
842                                 "PARTIALLYONLINE" :
843                                 pretty_print_flags(mem_ctx, node->flags),
844                        node->pnn == mypnn ? " (THIS NODE)" : "");
845         }
846 }
847
848 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
849                          struct ctdb_node_map *nodemap, uint32_t mypnn,
850                          struct ctdb_vnn_map *vnnmap, int recmode,
851                          uint32_t recmaster)
852 {
853         int i;
854
855         print_nodemap(mem_ctx, ctdb, nodemap, mypnn, true);
856
857         if (vnnmap->generation == INVALID_GENERATION) {
858                 printf("Generation:INVALID\n");
859         } else {
860                 printf("Generation:%u\n", vnnmap->generation);
861         }
862         printf("Size:%d\n", vnnmap->size);
863         for (i=0; i<vnnmap->size; i++) {
864                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
865         }
866
867         printf("Recovery mode:%s (%d)\n",
868                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
869                recmode);
870         printf("Recovery master:%d\n", recmaster);
871 }
872
873 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
874                           int argc, const char **argv)
875 {
876         struct ctdb_node_map *nodemap;
877         struct ctdb_vnn_map *vnnmap;
878         int recmode;
879         uint32_t recmaster;
880         int ret;
881
882         if (argc != 0) {
883                 usage("status");
884         }
885
886         nodemap = get_nodemap(ctdb, false);
887         if (nodemap == NULL) {
888                 return 1;
889         }
890
891         if (options.machinereadable == 1) {
892                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
893                 return 0;
894         }
895
896         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
897                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
898         if (ret != 0) {
899                 return ret;
900         }
901
902         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
903                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
904         if (ret != 0) {
905                 return ret;
906         }
907
908         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
909                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
910         if (ret != 0) {
911                 return ret;
912         }
913
914         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
915                      recmode, recmaster);
916         return 0;
917 }
918
919 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
920                           int argc, const char **argv)
921 {
922         struct ctdb_uptime *uptime;
923         int ret, tmp, days, hours, minutes, seconds;
924
925         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
926                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
927         if (ret != 0) {
928                 return ret;
929         }
930
931         printf("Current time of node %-4u     :                %s",
932                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
933
934         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
935         seconds = tmp % 60; tmp /= 60;
936         minutes = tmp % 60; tmp /= 60;
937         hours = tmp % 24; tmp /= 24;
938         days = tmp;
939
940         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
941                days, hours, minutes, seconds,
942                ctime(&uptime->ctdbd_start_time.tv_sec));
943
944         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
945         seconds = tmp % 60; tmp /= 60;
946         minutes = tmp % 60; tmp /= 60;
947         hours = tmp % 24; tmp /= 24;
948         days = tmp;
949
950         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
951                days, hours, minutes, seconds,
952                ctime(&uptime->last_recovery_finished.tv_sec));
953
954         printf("Duration of last recovery/failover: %lf seconds\n",
955                timeval_delta(&uptime->last_recovery_finished,
956                              &uptime->last_recovery_started));
957
958         return 0;
959 }
960
961 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
962                         int argc, const char **argv)
963 {
964         struct timeval tv;
965         int ret, num_clients;
966
967         tv = timeval_current();
968         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
969                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
970         if (ret != 0) {
971                 return ret;
972         }
973
974         printf("response from %u time=%.6f sec  (%d clients)\n",
975                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
976         return 0;
977 }
978
979 const char *runstate_to_string(enum ctdb_runstate runstate);
980 enum ctdb_runstate runstate_from_string(const char *runstate_str);
981
982 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
983                             int argc, const char **argv)
984 {
985         enum ctdb_runstate runstate;
986         bool found;
987         int ret, i;
988
989         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
990                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
991         if (ret != 0) {
992                 return ret;
993         }
994
995         found = true;
996         for (i=0; i<argc; i++) {
997                 enum ctdb_runstate t;
998
999                 found = false;
1000                 t = ctdb_runstate_from_string(argv[i]);
1001                 if (t == CTDB_RUNSTATE_UNKNOWN) {
1002                         printf("Invalid run state (%s)\n", argv[i]);
1003                         return 1;
1004                 }
1005
1006                 if (t == runstate) {
1007                         found = true;
1008                         break;
1009                 }
1010         }
1011
1012         if (! found) {
1013                 printf("CTDB not in required run state (got %s)\n",
1014                        ctdb_runstate_to_string(runstate));
1015                 return 1;
1016         }
1017
1018         printf("%s\n", ctdb_runstate_to_string(runstate));
1019         return 0;
1020 }
1021
1022 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1023                           int argc, const char **argv)
1024 {
1025         struct ctdb_var_list *tun_var_list;
1026         uint32_t value;
1027         int ret, i;
1028         bool found;
1029
1030         if (argc != 1) {
1031                 usage("getvar");
1032         }
1033
1034         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1035                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1036         if (ret != 0) {
1037                 fprintf(stderr,
1038                         "Failed to get list of variables from node %u\n",
1039                         ctdb->cmd_pnn);
1040                 return ret;
1041         }
1042
1043         found = false;
1044         for (i=0; i<tun_var_list->count; i++) {
1045                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1046                         found = true;
1047                         break;
1048                 }
1049         }
1050
1051         if (! found) {
1052                 printf("No such tunable %s\n", argv[0]);
1053                 return 1;
1054         }
1055
1056         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1057                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1058         if (ret != 0) {
1059                 return ret;
1060         }
1061
1062         printf("%-26s = %u\n", argv[0], value);
1063         return 0;
1064 }
1065
1066 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1067                           int argc, const char **argv)
1068 {
1069         struct ctdb_var_list *tun_var_list;
1070         struct ctdb_tunable tunable;
1071         int ret, i;
1072         bool found;
1073
1074         if (argc != 2) {
1075                 usage("setvar");
1076         }
1077
1078         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1079                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1080         if (ret != 0) {
1081                 fprintf(stderr,
1082                         "Failed to get list of variables from node %u\n",
1083                         ctdb->cmd_pnn);
1084                 return ret;
1085         }
1086
1087         found = false;
1088         for (i=0; i<tun_var_list->count; i++) {
1089                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1090                         found = true;
1091                         break;
1092                 }
1093         }
1094
1095         if (! found) {
1096                 printf("No such tunable %s\n", argv[0]);
1097                 return 1;
1098         }
1099
1100         tunable.name = argv[0];
1101         tunable.value = strtoul(argv[1], NULL, 0);
1102
1103         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1104                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1105         if (ret != 0) {
1106                 if (ret == 1) {
1107                         fprintf(stderr,
1108                                 "Setting obsolete tunable variable '%s'\n",
1109                                tunable.name);
1110                         return 0;
1111                 }
1112         }
1113
1114         return ret;
1115 }
1116
1117 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1118                             int argc, const char **argv)
1119 {
1120         struct ctdb_var_list *tun_var_list;
1121         int ret, i;
1122
1123         if (argc != 0) {
1124                 usage("listvars");
1125         }
1126
1127         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1128                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1129         if (ret != 0) {
1130                 return ret;
1131         }
1132
1133         for (i=0; i<tun_var_list->count; i++) {
1134                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1135         }
1136
1137         return 0;
1138 }
1139
1140 const struct {
1141         const char *name;
1142         uint32_t offset;
1143 } stats_fields[] = {
1144 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1145         STATISTICS_FIELD(num_clients),
1146         STATISTICS_FIELD(frozen),
1147         STATISTICS_FIELD(recovering),
1148         STATISTICS_FIELD(num_recoveries),
1149         STATISTICS_FIELD(client_packets_sent),
1150         STATISTICS_FIELD(client_packets_recv),
1151         STATISTICS_FIELD(node_packets_sent),
1152         STATISTICS_FIELD(node_packets_recv),
1153         STATISTICS_FIELD(keepalive_packets_sent),
1154         STATISTICS_FIELD(keepalive_packets_recv),
1155         STATISTICS_FIELD(node.req_call),
1156         STATISTICS_FIELD(node.reply_call),
1157         STATISTICS_FIELD(node.req_dmaster),
1158         STATISTICS_FIELD(node.reply_dmaster),
1159         STATISTICS_FIELD(node.reply_error),
1160         STATISTICS_FIELD(node.req_message),
1161         STATISTICS_FIELD(node.req_control),
1162         STATISTICS_FIELD(node.reply_control),
1163         STATISTICS_FIELD(node.req_tunnel),
1164         STATISTICS_FIELD(client.req_call),
1165         STATISTICS_FIELD(client.req_message),
1166         STATISTICS_FIELD(client.req_control),
1167         STATISTICS_FIELD(client.req_tunnel),
1168         STATISTICS_FIELD(timeouts.call),
1169         STATISTICS_FIELD(timeouts.control),
1170         STATISTICS_FIELD(timeouts.traverse),
1171         STATISTICS_FIELD(locks.num_calls),
1172         STATISTICS_FIELD(locks.num_current),
1173         STATISTICS_FIELD(locks.num_pending),
1174         STATISTICS_FIELD(locks.num_failed),
1175         STATISTICS_FIELD(total_calls),
1176         STATISTICS_FIELD(pending_calls),
1177         STATISTICS_FIELD(childwrite_calls),
1178         STATISTICS_FIELD(pending_childwrite_calls),
1179         STATISTICS_FIELD(memory_used),
1180         STATISTICS_FIELD(max_hop_count),
1181         STATISTICS_FIELD(total_ro_delegations),
1182         STATISTICS_FIELD(total_ro_revokes),
1183 };
1184
1185 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1186
1187 static void print_statistics_machine(struct ctdb_statistics *s,
1188                                      bool show_header)
1189 {
1190         int i;
1191
1192         if (show_header) {
1193                 printf("CTDB version%s", options.sep);
1194                 printf("Current time of statistics%s", options.sep);
1195                 printf("Statistics collected since%s", options.sep);
1196                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1197                         printf("%s%s", stats_fields[i].name, options.sep);
1198                 }
1199                 printf("num_reclock_ctdbd_latency%s", options.sep);
1200                 printf("min_reclock_ctdbd_latency%s", options.sep);
1201                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1202                 printf("max_reclock_ctdbd_latency%s", options.sep);
1203
1204                 printf("num_reclock_recd_latency%s", options.sep);
1205                 printf("min_reclock_recd_latency%s", options.sep);
1206                 printf("avg_reclock_recd_latency%s", options.sep);
1207                 printf("max_reclock_recd_latency%s", options.sep);
1208
1209                 printf("num_call_latency%s", options.sep);
1210                 printf("min_call_latency%s", options.sep);
1211                 printf("avg_call_latency%s", options.sep);
1212                 printf("max_call_latency%s", options.sep);
1213
1214                 printf("num_lockwait_latency%s", options.sep);
1215                 printf("min_lockwait_latency%s", options.sep);
1216                 printf("avg_lockwait_latency%s", options.sep);
1217                 printf("max_lockwait_latency%s", options.sep);
1218
1219                 printf("num_childwrite_latency%s", options.sep);
1220                 printf("min_childwrite_latency%s", options.sep);
1221                 printf("avg_childwrite_latency%s", options.sep);
1222                 printf("max_childwrite_latency%s", options.sep);
1223                 printf("\n");
1224         }
1225
1226         printf("%u%s", CTDB_PROTOCOL, options.sep);
1227         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1228         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1229         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1230                 printf("%u%s",
1231                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1232                        options.sep);
1233         }
1234         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1235         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1236         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1237         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1238
1239         printf("%u%s", s->reclock.recd.num, options.sep);
1240         printf("%.6f%s", s->reclock.recd.min, options.sep);
1241         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1242         printf("%.6f%s", s->reclock.recd.max, options.sep);
1243
1244         printf("%d%s", s->call_latency.num, options.sep);
1245         printf("%.6f%s", s->call_latency.min, options.sep);
1246         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1247         printf("%.6f%s", s->call_latency.max, options.sep);
1248
1249         printf("%d%s", s->childwrite_latency.num, options.sep);
1250         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1251         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1252         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1253         printf("\n");
1254 }
1255
1256 static void print_statistics(struct ctdb_statistics *s)
1257 {
1258         int tmp, days, hours, minutes, seconds;
1259         int i;
1260         const char *prefix = NULL;
1261         int preflen = 0;
1262
1263         tmp = s->statistics_current_time.tv_sec -
1264               s->statistics_start_time.tv_sec;
1265         seconds = tmp % 60; tmp /= 60;
1266         minutes = tmp % 60; tmp /= 60;
1267         hours   = tmp % 24; tmp /= 24;
1268         days    = tmp;
1269
1270         printf("CTDB version %u\n", CTDB_PROTOCOL);
1271         printf("Current time of statistics  :                %s",
1272                ctime(&s->statistics_current_time.tv_sec));
1273         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1274                days, hours, minutes, seconds,
1275                ctime(&s->statistics_start_time.tv_sec));
1276
1277         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1278                 if (strchr(stats_fields[i].name, '.') != NULL) {
1279                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1280                         if (! prefix ||
1281                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1282                                 prefix = stats_fields[i].name;
1283                                 printf(" %*.*s\n", preflen-1, preflen-1,
1284                                        stats_fields[i].name);
1285                         }
1286                 } else {
1287                         preflen = 0;
1288                 }
1289                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1290                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1291                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1292         }
1293
1294         printf(" hop_count_buckets:");
1295         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1296                 printf(" %d", s->hop_count_bucket[i]);
1297         }
1298         printf("\n");
1299         printf(" lock_buckets:");
1300         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1301                 printf(" %d", s->locks.buckets[i]);
1302         }
1303         printf("\n");
1304         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1305                "locks_latency      MIN/AVG/MAX",
1306                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1307                s->locks.latency.max, s->locks.latency.num);
1308
1309         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1310                "reclock_ctdbd      MIN/AVG/MAX",
1311                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1312                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1313
1314         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1315                "reclock_recd       MIN/AVG/MAX",
1316                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1317                s->reclock.recd.max, s->reclock.recd.num);
1318
1319         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1320                "call_latency       MIN/AVG/MAX",
1321                s->call_latency.min, LATENCY_AVG(s->call_latency),
1322                s->call_latency.max, s->call_latency.num);
1323
1324         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1325                "childwrite_latency MIN/AVG/MAX",
1326                s->childwrite_latency.min,
1327                LATENCY_AVG(s->childwrite_latency),
1328                s->childwrite_latency.max, s->childwrite_latency.num);
1329 }
1330
1331 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1332                               int argc, const char **argv)
1333 {
1334         struct ctdb_statistics *stats;
1335         int ret;
1336
1337         if (argc != 0) {
1338                 usage("statistics");
1339         }
1340
1341         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1342                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1343         if (ret != 0) {
1344                 return ret;
1345         }
1346
1347         if (options.machinereadable) {
1348                 print_statistics_machine(stats, true);
1349         } else {
1350                 print_statistics(stats);
1351         }
1352
1353         return 0;
1354 }
1355
1356 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1357                                     struct ctdb_context *ctdb,
1358                                     int argc, const char **argv)
1359 {
1360         int ret;
1361
1362         if (argc != 0) {
1363                 usage("statisticsreset");
1364         }
1365
1366         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1367                                          ctdb->cmd_pnn, TIMEOUT());
1368         if (ret != 0) {
1369                 return ret;
1370         }
1371
1372         return 0;
1373 }
1374
1375 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1376                          int argc, const char **argv)
1377 {
1378         struct ctdb_statistics_list *slist;
1379         int ret, count = 0, i;
1380         bool show_header = true;
1381
1382         if (argc > 1) {
1383                 usage("stats");
1384         }
1385
1386         if (argc == 1) {
1387                 count = atoi(argv[0]);
1388         }
1389
1390         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1391                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1392         if (ret != 0) {
1393                 return ret;
1394         }
1395
1396         for (i=0; i<slist->num; i++) {
1397                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1398                         continue;
1399                 }
1400                 if (options.machinereadable == 1) {
1401                         print_statistics_machine(&slist->stats[i],
1402                                                  show_header);
1403                         show_header = false;
1404                 } else {
1405                         print_statistics(&slist->stats[i]);
1406                 }
1407                 if (count > 0 && i == count) {
1408                         break;
1409                 }
1410         }
1411
1412         return 0;
1413 }
1414
1415 static int ctdb_public_ip_cmp(const void *a, const void *b)
1416 {
1417         const struct ctdb_public_ip *ip_a = a;
1418         const struct ctdb_public_ip *ip_b = b;
1419
1420         return ctdb_sock_addr_cmp(&ip_a->addr, &ip_b->addr);
1421 }
1422
1423 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1424                      struct ctdb_public_ip_list *ips,
1425                      struct ctdb_public_ip_info **ipinfo,
1426                      bool all_nodes)
1427 {
1428         int i, j;
1429         char *conf, *avail, *active;
1430
1431         if (options.machinereadable == 1) {
1432                 printf("%s%s%s%s%s", options.sep,
1433                        "Public IP", options.sep,
1434                        "Node", options.sep);
1435                 if (options.verbose == 1) {
1436                         printf("%s%s%s%s%s%s\n",
1437                                "ActiveInterfaces", options.sep,
1438                                "AvailableInterfaces", options.sep,
1439                                "ConfiguredInterfaces", options.sep);
1440                 } else {
1441                         printf("\n");
1442                 }
1443         } else {
1444                 if (all_nodes) {
1445                         printf("Public IPs on ALL nodes\n");
1446                 } else {
1447                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1448                 }
1449         }
1450
1451         for (i = 0; i < ips->num; i++) {
1452
1453                 if (options.machinereadable == 1) {
1454                         printf("%s%s%s%d%s", options.sep,
1455                                ctdb_sock_addr_to_string(
1456                                        mem_ctx, &ips->ip[i].addr, false),
1457                                options.sep,
1458                                (int)ips->ip[i].pnn, options.sep);
1459                 } else {
1460                         printf("%s", ctdb_sock_addr_to_string(
1461                                        mem_ctx, &ips->ip[i].addr, false));
1462                 }
1463
1464                 if (options.verbose == 0) {
1465                         if (options.machinereadable == 1) {
1466                                 printf("\n");
1467                         } else {
1468                                 printf(" %d\n", (int)ips->ip[i].pnn);
1469                         }
1470                         continue;
1471                 }
1472
1473                 conf = NULL;
1474                 avail = NULL;
1475                 active = NULL;
1476
1477                 if (ipinfo[i] == NULL) {
1478                         goto skip_ipinfo;
1479                 }
1480
1481                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1482                         struct ctdb_iface *iface;
1483
1484                         iface = &ipinfo[i]->ifaces->iface[j];
1485                         if (conf == NULL) {
1486                                 conf = talloc_strdup(mem_ctx, iface->name);
1487                         } else {
1488                                 conf = talloc_asprintf_append(
1489                                                 conf, ",%s", iface->name);
1490                         }
1491
1492                         if (ipinfo[i]->active_idx == j) {
1493                                 active = iface->name;
1494                         }
1495
1496                         if (iface->link_state == 0) {
1497                                 continue;
1498                         }
1499
1500                         if (avail == NULL) {
1501                                 avail = talloc_strdup(mem_ctx, iface->name);
1502                         } else {
1503                                 avail = talloc_asprintf_append(
1504                                                 avail, ",%s", iface->name);
1505                         }
1506                 }
1507
1508         skip_ipinfo:
1509
1510                 if (options.machinereadable == 1) {
1511                         printf("%s%s%s%s%s%s\n",
1512                                active ? active : "", options.sep,
1513                                avail ? avail : "", options.sep,
1514                                conf ? conf : "", options.sep);
1515                 } else {
1516                         printf(" node[%d] active[%s] available[%s]"
1517                                " configured[%s]\n",
1518                                (int)ips->ip[i].pnn, active ? active : "",
1519                                avail ? avail : "", conf ? conf : "");
1520                 }
1521         }
1522 }
1523
1524 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1525                        size_t datalen, void *private_data)
1526 {
1527         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1528                 private_data, struct ctdb_public_ip_list);
1529         struct ctdb_public_ip *ip;
1530
1531         ip = (struct ctdb_public_ip *)databuf;
1532         ips->ip[ips->num] = *ip;
1533         ips->num += 1;
1534
1535         return 0;
1536 }
1537
1538 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1539                               struct ctdb_public_ip_list **out)
1540 {
1541         struct ctdb_node_map *nodemap;
1542         struct ctdb_public_ip_list *ips;
1543         struct db_hash_context *ipdb;
1544         uint32_t *pnn_list;
1545         int ret, count, i, j;
1546
1547         nodemap = get_nodemap(ctdb, false);
1548         if (nodemap == NULL) {
1549                 return 1;
1550         }
1551
1552         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1558                                      &pnn_list);
1559         if (count <= 0) {
1560                 goto failed;
1561         }
1562
1563         for (i=0; i<count; i++) {
1564                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1565                                                pnn_list[i], TIMEOUT(),
1566                                                false, &ips);
1567                 if (ret != 0) {
1568                         goto failed;
1569                 }
1570
1571                 for (j=0; j<ips->num; j++) {
1572                         struct ctdb_public_ip ip;
1573
1574                         ip.pnn = ips->ip[j].pnn;
1575                         ip.addr = ips->ip[j].addr;
1576
1577                         if (pnn_list[i] == ip.pnn) {
1578                                 /* Node claims IP is hosted on it, so
1579                                  * save that information
1580                                  */
1581                                 ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1582                                                   sizeof(ip.addr),
1583                                                   (uint8_t *)&ip, sizeof(ip));
1584                                 if (ret != 0) {
1585                                         goto failed;
1586                                 }
1587                         } else {
1588                                 /* Node thinks IP is hosted elsewhere,
1589                                  * so overwrite with CTDB_UNKNOWN_PNN
1590                                  * if there's no existing entry
1591                                  */
1592                                 ret = db_hash_exists(ipdb, (uint8_t *)&ip.addr,
1593                                                      sizeof(ip.addr));
1594                                 if (ret == ENOENT) {
1595                                         ip.pnn = CTDB_UNKNOWN_PNN;
1596                                         ret = db_hash_add(ipdb,
1597                                                           (uint8_t *)&ip.addr,
1598                                                           sizeof(ip.addr),
1599                                                           (uint8_t *)&ip,
1600                                                           sizeof(ip));
1601                                         if (ret != 0) {
1602                                                 goto failed;
1603                                         }
1604                                 }
1605                         }
1606                 }
1607
1608                 TALLOC_FREE(ips);
1609         }
1610
1611         talloc_free(pnn_list);
1612
1613         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1614         if (ret != 0) {
1615                 goto failed;
1616         }
1617
1618         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1619         if (ips == NULL) {
1620                 goto failed;
1621         }
1622
1623         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1624         if (ips->ip == NULL) {
1625                 goto failed;
1626         }
1627
1628         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1629         if (ret != 0) {
1630                 goto failed;
1631         }
1632
1633         if (count != ips->num) {
1634                 goto failed;
1635         }
1636
1637         talloc_free(ipdb);
1638
1639         *out = ips;
1640         return 0;
1641
1642 failed:
1643         talloc_free(ipdb);
1644         return 1;
1645 }
1646
1647 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1648                       int argc, const char **argv)
1649 {
1650         struct ctdb_public_ip_list *ips;
1651         struct ctdb_public_ip_info **ipinfo;
1652         int ret, i;
1653         bool do_all = false;
1654
1655         if (argc > 1) {
1656                 usage("ip");
1657         }
1658
1659         if (argc == 1) {
1660                 if (strcmp(argv[0], "all") == 0) {
1661                         do_all = true;
1662                 } else {
1663                         usage("ip");
1664                 }
1665         }
1666
1667         if (do_all) {
1668                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1669         } else {
1670                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1671                                                ctdb->cmd_pnn, TIMEOUT(),
1672                                                false, &ips);
1673         }
1674         if (ret != 0) {
1675                 return ret;
1676         }
1677
1678         qsort(ips->ip, ips->num, sizeof(struct ctdb_public_ip),
1679               ctdb_public_ip_cmp);
1680
1681         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1682         if (ipinfo == NULL) {
1683                 return 1;
1684         }
1685
1686         for (i=0; i<ips->num; i++) {
1687                 uint32_t pnn;
1688                 if (do_all) {
1689                         pnn = ips->ip[i].pnn;
1690                 } else {
1691                         pnn = ctdb->cmd_pnn;
1692                 }
1693                 if (pnn == CTDB_UNKNOWN_PNN) {
1694                         ipinfo[i] = NULL;
1695                         continue;
1696                 }
1697                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1698                                                    ctdb->client, pnn,
1699                                                    TIMEOUT(), &ips->ip[i].addr,
1700                                                    &ipinfo[i]);
1701                 if (ret != 0) {
1702                         return ret;
1703                 }
1704         }
1705
1706         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1707         return 0;
1708 }
1709
1710 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1711                           int argc, const char **argv)
1712 {
1713         struct ctdb_public_ip_info *ipinfo;
1714         ctdb_sock_addr addr;
1715         int ret, i;
1716
1717         if (argc != 1) {
1718                 usage("ipinfo");
1719         }
1720
1721         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
1722         if (ret != 0) {
1723                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1724                 return 1;
1725         }
1726
1727         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1728                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1729                                            &ipinfo);
1730         if (ret != 0) {
1731                 if (ret == -1) {
1732                         printf("Node %u does not know about IP %s\n",
1733                                ctdb->cmd_pnn, argv[0]);
1734                 }
1735                 return ret;
1736         }
1737
1738         printf("Public IP[%s] info on node %u\n",
1739                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1740                                         ctdb->cmd_pnn);
1741
1742         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1743                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1744                ipinfo->ip.pnn, ipinfo->ifaces->num);
1745
1746         for (i=0; i<ipinfo->ifaces->num; i++) {
1747                 struct ctdb_iface *iface;
1748
1749                 iface = &ipinfo->ifaces->iface[i];
1750                 iface->name[CTDB_IFACE_SIZE] = '\0';
1751                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1752                        i+1, iface->name,
1753                        iface->link_state == 0 ? "down" : "up",
1754                        iface->references,
1755                        (i == ipinfo->active_idx) ? " (active)" : "");
1756         }
1757
1758         return 0;
1759 }
1760
1761 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1762                           int argc, const char **argv)
1763 {
1764         struct ctdb_iface_list *ifaces;
1765         int ret, i;
1766
1767         if (argc != 0) {
1768                 usage("ifaces");
1769         }
1770
1771         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1772                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1773         if (ret != 0) {
1774                 return ret;
1775         }
1776
1777         if (ifaces->num == 0) {
1778                 printf("No interfaces configured on node %u\n",
1779                        ctdb->cmd_pnn);
1780                 return 0;
1781         }
1782
1783         if (options.machinereadable) {
1784                 printf("%s%s%s%s%s%s%s\n", options.sep,
1785                        "Name", options.sep,
1786                        "LinkStatus", options.sep,
1787                        "References", options.sep);
1788         } else {
1789                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1790         }
1791
1792         for (i=0; i<ifaces->num; i++) {
1793                 if (options.machinereadable) {
1794                         printf("%s%s%s%u%s%u%s\n", options.sep,
1795                                ifaces->iface[i].name, options.sep,
1796                                ifaces->iface[i].link_state, options.sep,
1797                                ifaces->iface[i].references, options.sep);
1798                 } else {
1799                         printf("name:%s link:%s references:%u\n",
1800                                ifaces->iface[i].name,
1801                                ifaces->iface[i].link_state ? "up" : "down",
1802                                ifaces->iface[i].references);
1803                 }
1804         }
1805
1806         return 0;
1807 }
1808
1809 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                                 int argc, const char **argv)
1811 {
1812         struct ctdb_iface_list *ifaces;
1813         struct ctdb_iface *iface;
1814         int ret, i;
1815
1816         if (argc != 2) {
1817                 usage("setifacelink");
1818         }
1819
1820         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1821                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1822                 return 1;
1823         }
1824
1825         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1826                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1827         if (ret != 0) {
1828                 fprintf(stderr,
1829                         "Failed to get interface information from node %u\n",
1830                         ctdb->cmd_pnn);
1831                 return ret;
1832         }
1833
1834         iface = NULL;
1835         for (i=0; i<ifaces->num; i++) {
1836                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1837                         iface = &ifaces->iface[i];
1838                         break;
1839                 }
1840         }
1841
1842         if (iface == NULL) {
1843                 printf("Interface %s not configured on node %u\n",
1844                        argv[0], ctdb->cmd_pnn);
1845                 return 1;
1846         }
1847
1848         if (strcmp(argv[1], "up") == 0) {
1849                 iface->link_state = 1;
1850         } else if (strcmp(argv[1], "down") == 0) {
1851                 iface->link_state = 0;
1852         } else {
1853                 usage("setifacelink");
1854                 return 1;
1855         }
1856
1857         iface->references = 0;
1858
1859         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1860                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1861         if (ret != 0) {
1862                 return ret;
1863         }
1864
1865         return 0;
1866 }
1867
1868 static int control_process_exists(TALLOC_CTX *mem_ctx,
1869                                   struct ctdb_context *ctdb,
1870                                   int argc, const char **argv)
1871 {
1872         pid_t pid;
1873         uint64_t srvid = 0;
1874         int ret, status;
1875
1876         if (argc != 1 && argc != 2) {
1877                 usage("process-exists");
1878         }
1879
1880         pid = atoi(argv[0]);
1881         if (argc == 2) {
1882                 srvid = strtoull(argv[1], NULL, 0);
1883         }
1884
1885         if (srvid == 0) {
1886                 ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1887                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1888         } else {
1889                 struct ctdb_pid_srvid pid_srvid;
1890
1891                 pid_srvid.pid = pid;
1892                 pid_srvid.srvid = srvid;
1893
1894                 ret = ctdb_ctrl_check_pid_srvid(mem_ctx, ctdb->ev,
1895                                                 ctdb->client, ctdb->cmd_pnn,
1896                                                 TIMEOUT(), &pid_srvid,
1897                                                 &status);
1898         }
1899
1900         if (ret != 0) {
1901                 return ret;
1902         }
1903
1904         if (srvid == 0) {
1905                 printf("PID %d %s\n", pid,
1906                        (status == 0 ? "exists" : "does not exist"));
1907         } else {
1908                 printf("PID %d with SRVID 0x%"PRIx64" %s\n", pid, srvid,
1909                        (status == 0 ? "exists" : "does not exist"));
1910         }
1911         return status;
1912 }
1913
1914 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1915                             int argc, const char **argv)
1916 {
1917         struct ctdb_dbid_map *dbmap;
1918         int ret, i;
1919
1920         if (argc != 0) {
1921                 usage("getdbmap");
1922         }
1923
1924         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1925                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1926         if (ret != 0) {
1927                 return ret;
1928         }
1929
1930         if (options.machinereadable == 1) {
1931                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1932                        options.sep,
1933                        "ID", options.sep,
1934                        "Name", options.sep,
1935                        "Path", options.sep,
1936                        "Persistent", options.sep,
1937                        "Sticky", options.sep,
1938                        "Unhealthy", options.sep,
1939                        "Readonly", options.sep,
1940                        "Replicated", options.sep);
1941         } else {
1942                 printf("Number of databases:%d\n", dbmap->num);
1943         }
1944
1945         for (i=0; i<dbmap->num; i++) {
1946                 const char *name;
1947                 const char *path;
1948                 const char *health;
1949                 bool persistent;
1950                 bool readonly;
1951                 bool sticky;
1952                 bool replicated;
1953                 uint32_t db_id;
1954
1955                 db_id = dbmap->dbs[i].db_id;
1956
1957                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1958                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1959                                            &name);
1960                 if (ret != 0) {
1961                         return ret;
1962                 }
1963
1964                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1965                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1966                                           &path);
1967                 if (ret != 0) {
1968                         return ret;
1969                 }
1970
1971                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1972                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1973                                               &health);
1974                 if (ret != 0) {
1975                         return ret;
1976                 }
1977
1978                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1979                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1980                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1981                 replicated = dbmap->dbs[i].flags & CTDB_DB_FLAGS_REPLICATED;
1982
1983                 if (options.machinereadable == 1) {
1984                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s%d%s\n",
1985                                options.sep,
1986                                db_id, options.sep,
1987                                name, options.sep,
1988                                path, options.sep,
1989                                !! (persistent), options.sep,
1990                                !! (sticky), options.sep,
1991                                !! (health), options.sep,
1992                                !! (readonly), options.sep,
1993                                !! (replicated), options.sep);
1994                 } else {
1995                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s%s\n",
1996                                db_id, name, path,
1997                                persistent ? " PERSISTENT" : "",
1998                                sticky ? " STICKY" : "",
1999                                readonly ? " READONLY" : "",
2000                                replicated ? " REPLICATED" : "",
2001                                health ? " UNHEALTHY" : "");
2002                 }
2003
2004                 talloc_free(discard_const(name));
2005                 talloc_free(discard_const(path));
2006                 talloc_free(discard_const(health));
2007         }
2008
2009         return 0;
2010 }
2011
2012 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2013                                int argc, const char **argv)
2014 {
2015         uint32_t db_id;
2016         const char *db_name, *db_path, *db_health;
2017         uint8_t db_flags;
2018         int ret;
2019
2020         if (argc != 1) {
2021                 usage("getdbstatus");
2022         }
2023
2024         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2025                 return 1;
2026         }
2027
2028         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
2029                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
2030                                   &db_path);
2031         if (ret != 0) {
2032                 return ret;
2033         }
2034
2035         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
2036                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
2037                                       &db_health);
2038         if (ret != 0) {
2039                 return ret;
2040         }
2041
2042         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
2043         printf("PERSISTENT: %s\nREPLICATED: %s\nSTICKY: %s\nREADONLY: %s\n",
2044                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
2045                (db_flags & CTDB_DB_FLAGS_REPLICATED ? "yes" : "no"),
2046                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
2047                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"));
2048         printf("HEALTH: %s\n", (db_health ? db_health : "OK"));
2049         return 0;
2050 }
2051
2052 struct dump_record_state {
2053         uint32_t count;
2054 };
2055
2056 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
2057
2058 static void dump_tdb_data(const char *name, TDB_DATA val)
2059 {
2060         int i;
2061
2062         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
2063         for (i=0; i<val.dsize; i++) {
2064                 if (ISASCII(val.dptr[i])) {
2065                         fprintf(stdout, "%c", val.dptr[i]);
2066                 } else {
2067                         fprintf(stdout, "\\%02X", val.dptr[i]);
2068                 }
2069         }
2070         fprintf(stdout, "\"\n");
2071 }
2072
2073 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
2074 {
2075         fprintf(stdout, "dmaster: %u\n", header->dmaster);
2076         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
2077         fprintf(stdout, "flags: 0x%08x", header->flags);
2078         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
2079                 fprintf(stdout, " MIGRATED_WITH_DATA");
2080         }
2081         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
2082                 fprintf(stdout, " VACUUM_MIGRATED");
2083         }
2084         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
2085                 fprintf(stdout, " AUTOMATIC");
2086         }
2087         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
2088                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
2089         }
2090         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
2091                 fprintf(stdout, " RO_HAVE_READONLY");
2092         }
2093         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
2094                 fprintf(stdout, " RO_REVOKING_READONLY");
2095         }
2096         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
2097                 fprintf(stdout, " RO_REVOKE_COMPLETE");
2098         }
2099         fprintf(stdout, "\n");
2100
2101 }
2102
2103 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
2104                        TDB_DATA key, TDB_DATA data, void *private_data)
2105 {
2106         struct dump_record_state *state =
2107                 (struct dump_record_state *)private_data;
2108
2109         state->count += 1;
2110
2111         dump_tdb_data("key", key);
2112         dump_ltdb_header(header);
2113         dump_tdb_data("data", data);
2114         fprintf(stdout, "\n");
2115
2116         return 0;
2117 }
2118
2119 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2120                          int argc, const char **argv)
2121 {
2122         struct ctdb_db_context *db;
2123         const char *db_name;
2124         uint32_t db_id;
2125         uint8_t db_flags;
2126         struct dump_record_state state;
2127         int ret;
2128
2129         if (argc != 1) {
2130                 usage("catdb");
2131         }
2132
2133         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2134                 return 1;
2135         }
2136
2137         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2138                           db_flags, &db);
2139         if (ret != 0) {
2140                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2141                 return ret;
2142         }
2143
2144         state.count = 0;
2145
2146         ret = ctdb_db_traverse(mem_ctx, ctdb->ev, ctdb->client, db,
2147                                ctdb->cmd_pnn, TIMEOUT(),
2148                                dump_record, &state);
2149
2150         printf("Dumped %u records\n", state.count);
2151
2152         return ret;
2153 }
2154
2155 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2156                           int argc, const char **argv)
2157 {
2158         struct ctdb_db_context *db;
2159         const char *db_name;
2160         uint32_t db_id;
2161         uint8_t db_flags;
2162         struct dump_record_state state;
2163         int ret;
2164
2165         if (argc != 1) {
2166                 usage("catdb");
2167         }
2168
2169         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2170                 return 1;
2171         }
2172
2173         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2174                           db_flags, &db);
2175         if (ret != 0) {
2176                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2177                 return ret;
2178         }
2179
2180         state.count = 0;
2181         ret = ctdb_db_traverse_local(db, true, true, dump_record, &state);
2182
2183         printf("Dumped %u record(s)\n", state.count);
2184
2185         return ret;
2186 }
2187
2188 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2189                                    struct ctdb_context *ctdb,
2190                                    int argc, const char **argv)
2191 {
2192         uint32_t caps;
2193         int ret;
2194
2195         if (argc != 0) {
2196                 usage("getcapabilities");
2197         }
2198
2199         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2200                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2201         if (ret != 0) {
2202                 return ret;
2203         }
2204
2205         if (options.machinereadable == 1) {
2206                 printf("%s%s%s%s%s\n",
2207                        options.sep,
2208                        "RECMASTER", options.sep,
2209                        "LMASTER", options.sep);
2210                 printf("%s%d%s%d%s\n", options.sep,
2211                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2212                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2213         } else {
2214                 printf("RECMASTER: %s\n",
2215                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2216                 printf("LMASTER: %s\n",
2217                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2218         }
2219
2220         return 0;
2221 }
2222
2223 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2224                        int argc, const char **argv)
2225 {
2226         printf("%u\n", ctdb_client_pnn(ctdb->client));
2227         return 0;
2228 }
2229
2230 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2231                        int argc, const char **argv)
2232 {
2233         char *t, *lvs_helper = NULL;
2234
2235         if (argc != 1) {
2236                 usage("lvs");
2237         }
2238
2239         t = getenv("CTDB_LVS_HELPER");
2240         if (t != NULL) {
2241                 lvs_helper = talloc_strdup(mem_ctx, t);
2242         } else {
2243                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2244                                              CTDB_HELPER_BINDIR);
2245         }
2246
2247         if (lvs_helper == NULL) {
2248                 fprintf(stderr, "Unable to set LVS helper\n");
2249                 return 1;
2250         }
2251
2252         return run_helper(mem_ctx, "LVS helper", lvs_helper, argc, argv);
2253 }
2254
2255 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2256                             int argc, const char **argv)
2257 {
2258         int log_level;
2259         int ret;
2260         bool found;
2261
2262         if (argc != 1) {
2263                 usage("setdebug");
2264         }
2265
2266         found = debug_level_parse(argv[0], &log_level);
2267         if (! found) {
2268                 fprintf(stderr,
2269                         "Invalid debug level '%s'. Valid levels are:\n",
2270                         argv[0]);
2271                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2272                 return 1;
2273         }
2274
2275         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2276                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2277         if (ret != 0) {
2278                 return ret;
2279         }
2280
2281         return 0;
2282 }
2283
2284 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2285                             int argc, const char **argv)
2286 {
2287         int loglevel;
2288         const char *log_str;
2289         int ret;
2290
2291         if (argc != 0) {
2292                 usage("getdebug");
2293         }
2294
2295         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2296                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2297         if (ret != 0) {
2298                 return ret;
2299         }
2300
2301         log_str = debug_level_to_string(loglevel);
2302         printf("%s\n", log_str);
2303
2304         return 0;
2305 }
2306
2307 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2308                           int argc, const char **argv)
2309 {
2310         const char *db_name;
2311         uint8_t db_flags = 0;
2312         int ret;
2313
2314         if (argc < 1 || argc > 2) {
2315                 usage("attach");
2316         }
2317
2318         db_name = argv[0];
2319         if (argc == 2) {
2320                 if (strcmp(argv[1], "persistent") == 0) {
2321                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2322                 } else if (strcmp(argv[1], "readonly") == 0) {
2323                         db_flags = CTDB_DB_FLAGS_READONLY;
2324                 } else if (strcmp(argv[1], "sticky") == 0) {
2325                         db_flags = CTDB_DB_FLAGS_STICKY;
2326                 } else if (strcmp(argv[1], "replicated") == 0) {
2327                         db_flags = CTDB_DB_FLAGS_REPLICATED;
2328                 } else {
2329                         usage("attach");
2330                 }
2331         }
2332
2333         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2334                           db_flags, NULL);
2335         if (ret != 0) {
2336                 return ret;
2337         }
2338
2339         return 0;
2340 }
2341
2342 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2343                           int argc, const char **argv)
2344 {
2345         const char *db_name;
2346         uint32_t db_id;
2347         uint8_t db_flags;
2348         struct ctdb_node_map *nodemap;
2349         int recmode;
2350         int ret, ret2, i;
2351
2352         if (argc < 1) {
2353                 usage("detach");
2354         }
2355
2356         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2357                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2358         if (ret != 0) {
2359                 return ret;
2360         }
2361
2362         if (recmode == CTDB_RECOVERY_ACTIVE) {
2363                 fprintf(stderr, "Database cannot be detached"
2364                                 " when recovery is active\n");
2365                 return 1;
2366         }
2367
2368         nodemap = get_nodemap(ctdb, false);
2369         if (nodemap == NULL) {
2370                 return 1;
2371         }
2372
2373         for (i=0; i<nodemap->num; i++) {
2374                 uint32_t value;
2375
2376                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2377                         continue;
2378                 }
2379                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2380                         continue;
2381                 }
2382                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2383                         fprintf(stderr, "Database cannot be detached on"
2384                                 " inactive (stopped or banned) node %u\n",
2385                                 nodemap->node[i].pnn);
2386                         return 1;
2387                 }
2388
2389                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2390                                             nodemap->node[i].pnn, TIMEOUT(),
2391                                             "AllowClientDBAttach", &value);
2392                 if (ret != 0) {
2393                         fprintf(stderr,
2394                                 "Unable to get tunable AllowClientDBAttach"
2395                                 " from node %u\n", nodemap->node[i].pnn);
2396                         return ret;
2397                 }
2398
2399                 if (value == 1) {
2400                         fprintf(stderr,
2401                                 "Database access is still active on node %u."
2402                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2403                                 nodemap->node[i].pnn);
2404                         return 1;
2405                 }
2406         }
2407
2408         ret2 = 0;
2409         for (i=0; i<argc; i++) {
2410                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2411                                 &db_flags)) {
2412                         continue;
2413                 }
2414
2415                 if (db_flags &
2416                     (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
2417                         fprintf(stderr,
2418                                 "Only volatile databases can be detached\n");
2419                         return 1;
2420                 }
2421
2422                 ret = ctdb_detach(ctdb->ev, ctdb->client, TIMEOUT(), db_id);
2423                 if (ret != 0) {
2424                         fprintf(stderr, "Database %s detach failed\n", db_name);
2425                         ret2 = ret;
2426                 }
2427         }
2428
2429         return ret2;
2430 }
2431
2432 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2433                               int argc, const char **argv)
2434 {
2435         const char *mem_str;
2436         ssize_t n;
2437         int ret;
2438
2439         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2440                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2441         if (ret != 0) {
2442                 return ret;
2443         }
2444
2445         n = write(1, mem_str, strlen(mem_str)+1);
2446         if (n < 0 || n != strlen(mem_str)+1) {
2447                 fprintf(stderr, "Failed to write talloc summary\n");
2448                 return 1;
2449         }
2450
2451         return 0;
2452 }
2453
2454 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2455 {
2456         bool *done = (bool *)private_data;
2457         ssize_t n;
2458
2459         n = write(1, data.dptr, data.dsize);
2460         if (n < 0 || n != data.dsize) {
2461                 fprintf(stderr, "Failed to write talloc summary\n");
2462         }
2463
2464         *done = true;
2465 }
2466
2467 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2468                                 int argc, const char **argv)
2469 {
2470         struct ctdb_srvid_message msg = { 0 };
2471         int ret;
2472         bool done = false;
2473
2474         msg.pnn = ctdb->pnn;
2475         msg.srvid = next_srvid(ctdb);
2476
2477         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2478                                               msg.srvid, dump_memory, &done);
2479         if (ret != 0) {
2480                 return ret;
2481         }
2482
2483         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2484                                     ctdb->cmd_pnn, &msg);
2485         if (ret != 0) {
2486                 return ret;
2487         }
2488
2489         ctdb_client_wait(ctdb->ev, &done);
2490         return 0;
2491 }
2492
2493 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2494                           int argc, const char **argv)
2495 {
2496         pid_t pid;
2497         int ret;
2498
2499         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2500                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2501         if (ret != 0) {
2502                 return ret;
2503         }
2504
2505         printf("%u\n", pid);
2506         return 0;
2507 }
2508
2509 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2510                        const char *desc, uint32_t flag, bool set_flag)
2511 {
2512         struct ctdb_node_map *nodemap;
2513         bool flag_is_set;
2514
2515         nodemap = get_nodemap(ctdb, false);
2516         if (nodemap == NULL) {
2517                 return 1;
2518         }
2519
2520         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2521         if (set_flag == flag_is_set) {
2522                 if (set_flag) {
2523                         fprintf(stderr, "Node %u is already %s\n",
2524                                 ctdb->cmd_pnn, desc);
2525                 } else {
2526                         fprintf(stderr, "Node %u is not %s\n",
2527                                 ctdb->cmd_pnn, desc);
2528                 }
2529                 return 0;
2530         }
2531
2532         return 1;
2533 }
2534
2535 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2536                            uint32_t flag, bool set_flag)
2537 {
2538         struct ctdb_node_map *nodemap;
2539         bool flag_is_set;
2540
2541         while (1) {
2542                 nodemap = get_nodemap(ctdb, true);
2543                 if (nodemap == NULL) {
2544                         fprintf(stderr,
2545                                 "Failed to get nodemap, trying again\n");
2546                         sleep(1);
2547                         continue;
2548                 }
2549
2550                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2551                 if (flag_is_set == set_flag) {
2552                         break;
2553                 }
2554
2555                 sleep(1);
2556         }
2557 }
2558
2559 static int ctdb_ctrl_modflags(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2560                               struct ctdb_client_context *client,
2561                               uint32_t destnode, struct timeval timeout,
2562                               uint32_t set, uint32_t clear)
2563 {
2564         struct ctdb_node_map *nodemap;
2565         struct ctdb_node_flag_change flag_change;
2566         struct ctdb_req_control request;
2567         uint32_t *pnn_list;
2568         int ret, count;
2569
2570         ret = ctdb_ctrl_get_nodemap(mem_ctx, ev, client, destnode,
2571                                     tevent_timeval_zero(), &nodemap);
2572         if (ret != 0) {
2573                 return ret;
2574         }
2575
2576         flag_change.pnn = destnode;
2577         flag_change.old_flags = nodemap->node[destnode].flags;
2578         flag_change.new_flags = flag_change.old_flags | set;
2579         flag_change.new_flags &= ~clear;
2580
2581         count = list_of_connected_nodes(nodemap, -1, mem_ctx, &pnn_list);
2582         if (count == -1) {
2583                 return ENOMEM;
2584         }
2585
2586         ctdb_req_control_modify_flags(&request, &flag_change);
2587         ret = ctdb_client_control_multi(mem_ctx, ev, client, pnn_list, count,
2588                                         tevent_timeval_zero(), &request,
2589                                         NULL, NULL);
2590         return ret;
2591 }
2592
2593 struct ipreallocate_state {
2594         int status;
2595         bool done;
2596 };
2597
2598 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2599                                  void *private_data)
2600 {
2601         struct ipreallocate_state *state =
2602                 (struct ipreallocate_state *)private_data;
2603
2604         if (data.dsize != sizeof(int)) {
2605                 /* Ignore packet */
2606                 return;
2607         }
2608
2609         state->status = *(int *)data.dptr;
2610         state->done = true;
2611 }
2612
2613 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2614 {
2615         struct ctdb_srvid_message msg = { 0 };
2616         struct ipreallocate_state state;
2617         int ret;
2618
2619         msg.pnn = ctdb->pnn;
2620         msg.srvid = next_srvid(ctdb);
2621
2622         state.done = false;
2623         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2624                                               msg.srvid,
2625                                               ipreallocate_handler, &state);
2626         if (ret != 0) {
2627                 return ret;
2628         }
2629
2630         while (true) {
2631                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2632                                                 ctdb->client,
2633                                                 CTDB_BROADCAST_CONNECTED,
2634                                                 &msg);
2635                 if (ret != 0) {
2636                         goto fail;
2637                 }
2638
2639                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2640                                                TIMEOUT());
2641                 if (ret != 0) {
2642                         continue;
2643                 }
2644
2645                 if (state.status >= 0) {
2646                         ret = 0;
2647                 } else {
2648                         ret = state.status;
2649                 }
2650                 break;
2651         }
2652
2653 fail:
2654         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2655                                            msg.srvid, &state);
2656         return ret;
2657 }
2658
2659 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2660                            int argc, const char **argv)
2661 {
2662         int ret;
2663
2664         if (argc != 0) {
2665                 usage("disable");
2666         }
2667
2668         ret = check_flags(mem_ctx, ctdb, "disabled",
2669                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2670         if (ret == 0) {
2671                 return 0;
2672         }
2673
2674         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2675                                  ctdb->cmd_pnn, TIMEOUT(),
2676                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2677         if (ret != 0) {
2678                 fprintf(stderr,
2679                         "Failed to set DISABLED flag on node %u\n",
2680                         ctdb->cmd_pnn);
2681                 return ret;
2682         }
2683
2684         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2685         return ipreallocate(mem_ctx, ctdb);
2686 }
2687
2688 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2689                           int argc, const char **argv)
2690 {
2691         int ret;
2692
2693         if (argc != 0) {
2694                 usage("enable");
2695         }
2696
2697         ret = check_flags(mem_ctx, ctdb, "disabled",
2698                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2699         if (ret == 0) {
2700                 return 0;
2701         }
2702
2703         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2704                                  ctdb->cmd_pnn, TIMEOUT(),
2705                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2706         if (ret != 0) {
2707                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2708                         ctdb->cmd_pnn);
2709                 return ret;
2710         }
2711
2712         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2713         return ipreallocate(mem_ctx, ctdb);
2714 }
2715
2716 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2717                         int argc, const char **argv)
2718 {
2719         int ret;
2720
2721         if (argc != 0) {
2722                 usage("stop");
2723         }
2724
2725         ret = check_flags(mem_ctx, ctdb, "stopped",
2726                           NODE_FLAGS_STOPPED, true);
2727         if (ret == 0) {
2728                 return 0;
2729         }
2730
2731         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2732                                   ctdb->cmd_pnn, TIMEOUT());
2733         if (ret != 0) {
2734                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2735                 return ret;
2736         }
2737
2738         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2739         return ipreallocate(mem_ctx, ctdb);
2740 }
2741
2742 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2743                             int argc, const char **argv)
2744 {
2745         int ret;
2746
2747         if (argc != 0) {
2748                 usage("continue");
2749         }
2750
2751         ret = check_flags(mem_ctx, ctdb, "stopped",
2752                           NODE_FLAGS_STOPPED, false);
2753         if (ret == 0) {
2754                 return 0;
2755         }
2756
2757         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2758                                       ctdb->cmd_pnn, TIMEOUT());
2759         if (ret != 0) {
2760                 fprintf(stderr, "Failed to continue stopped node %u\n",
2761                         ctdb->cmd_pnn);
2762                 return ret;
2763         }
2764
2765         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2766         return ipreallocate(mem_ctx, ctdb);
2767 }
2768
2769 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2770                        int argc, const char **argv)
2771 {
2772         struct ctdb_ban_state ban_state;
2773         int ret;
2774
2775         if (argc != 1) {
2776                 usage("ban");
2777         }
2778
2779         ret = check_flags(mem_ctx, ctdb, "banned",
2780                           NODE_FLAGS_BANNED, true);
2781         if (ret == 0) {
2782                 return 0;
2783         }
2784
2785         ban_state.pnn = ctdb->cmd_pnn;
2786         ban_state.time = strtoul(argv[0], NULL, 0);
2787
2788         if (ban_state.time == 0) {
2789                 fprintf(stderr, "Ban time cannot be zero\n");
2790                 return EINVAL;
2791         }
2792
2793         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2794                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2795         if (ret != 0) {
2796                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2797                 return ret;
2798         }
2799
2800         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2801         return ipreallocate(mem_ctx, ctdb);
2802
2803 }
2804
2805 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2806                          int argc, const char **argv)
2807 {
2808         struct ctdb_ban_state ban_state;
2809         int ret;
2810
2811         if (argc != 0) {
2812                 usage("unban");
2813         }
2814
2815         ret = check_flags(mem_ctx, ctdb, "banned",
2816                           NODE_FLAGS_BANNED, false);
2817         if (ret == 0) {
2818                 return 0;
2819         }
2820
2821         ban_state.pnn = ctdb->cmd_pnn;
2822         ban_state.time = 0;
2823
2824         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2825                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2826         if (ret != 0) {
2827                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2828                 return ret;
2829         }
2830
2831         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2832         return ipreallocate(mem_ctx, ctdb);
2833
2834 }
2835
2836 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2837                             int argc, const char **argv)
2838 {
2839         int ret;
2840
2841         if (argc != 0) {
2842                 usage("shutdown");
2843         }
2844
2845         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2846                                  ctdb->cmd_pnn, TIMEOUT());
2847         if (ret != 0) {
2848                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2849                 return ret;
2850         }
2851
2852         return 0;
2853 }
2854
2855 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2856                           uint32_t *generation)
2857 {
2858         uint32_t recmaster;
2859         int recmode;
2860         struct ctdb_vnn_map *vnnmap;
2861         int ret;
2862
2863 again:
2864         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2865                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2866         if (ret != 0) {
2867                 fprintf(stderr, "Failed to find recovery master\n");
2868                 return ret;
2869         }
2870
2871         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2872                                     recmaster, TIMEOUT(), &recmode);
2873         if (ret != 0) {
2874                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2875                         recmaster);
2876                 return ret;
2877         }
2878
2879         if (recmode == CTDB_RECOVERY_ACTIVE) {
2880                 sleep(1);
2881                 goto again;
2882         }
2883
2884         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2885                                   recmaster, TIMEOUT(), &vnnmap);
2886         if (ret != 0) {
2887                 fprintf(stderr, "Failed to get generation from node %u\n",
2888                         recmaster);
2889                 return ret;
2890         }
2891
2892         if (vnnmap->generation == INVALID_GENERATION) {
2893                 talloc_free(vnnmap);
2894                 sleep(1);
2895                 goto again;
2896         }
2897
2898         *generation = vnnmap->generation;
2899         talloc_free(vnnmap);
2900         return 0;
2901 }
2902
2903
2904 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2905                            int argc, const char **argv)
2906 {
2907         uint32_t generation, next_generation;
2908         int ret;
2909
2910         if (argc != 0) {
2911                 usage("recover");
2912         }
2913
2914         ret = get_generation(mem_ctx, ctdb, &generation);
2915         if (ret != 0) {
2916                 return ret;
2917         }
2918
2919         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2920                                     ctdb->cmd_pnn, TIMEOUT(),
2921                                     CTDB_RECOVERY_ACTIVE);
2922         if (ret != 0) {
2923                 fprintf(stderr, "Failed to set recovery mode active\n");
2924                 return ret;
2925         }
2926
2927         while (1) {
2928                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2929                 if (ret != 0) {
2930                         fprintf(stderr,
2931                                 "Failed to confirm end of recovery\n");
2932                         return ret;
2933                 }
2934
2935                 if (next_generation != generation) {
2936                         break;
2937                 }
2938
2939                 sleep (1);
2940         }
2941
2942         return 0;
2943 }
2944
2945 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2946                                 int argc, const char **argv)
2947 {
2948         if (argc != 0) {
2949                 usage("ipreallocate");
2950         }
2951
2952         return ipreallocate(mem_ctx, ctdb);
2953 }
2954
2955 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2956                                   struct ctdb_context *ctdb,
2957                                   int argc, const char **argv)
2958 {
2959         uint32_t recmaster;
2960         int ret;
2961
2962         if (argc != 0) {
2963                 usage("isnotrecmaster");
2964         }
2965
2966         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2967                                       ctdb->pnn, TIMEOUT(), &recmaster);
2968         if (ret != 0) {
2969                 fprintf(stderr, "Failed to get recmaster\n");
2970                 return ret;
2971         }
2972
2973         if (recmaster != ctdb->pnn) {
2974                 printf("this node is not the recmaster\n");
2975                 return 1;
2976         }
2977
2978         printf("this node is the recmaster\n");
2979         return 0;
2980 }
2981
2982 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2983                            int argc, const char **argv)
2984 {
2985         struct ctdb_addr_info addr_info;
2986         int ret;
2987
2988         if (argc != 2) {
2989                 usage("gratarp");
2990         }
2991
2992         ret = ctdb_sock_addr_from_string(argv[0], &addr_info.addr, false);
2993         if (ret != 0) {
2994                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
2995                 return 1;
2996         }
2997         addr_info.iface = argv[1];
2998
2999         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
3000                                             ctdb->cmd_pnn, TIMEOUT(),
3001                                             &addr_info);
3002         if (ret != 0) {
3003                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
3004                         ctdb->cmd_pnn);
3005                 return ret;
3006         }
3007
3008         return 0;
3009 }
3010
3011 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3012                            int argc, const char **argv)
3013 {
3014         ctdb_sock_addr src, dst;
3015         int ret;
3016
3017         if (argc != 0 && argc != 2) {
3018                 usage("tickle");
3019         }
3020
3021         if (argc == 0) {
3022                 struct ctdb_connection_list *clist;
3023                 int i, num_failed;
3024
3025                 /* Client first but the src/dst logic is confused */
3026                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3027                 if (ret != 0) {
3028                         return ret;
3029                 }
3030
3031                 num_failed = 0;
3032                 for (i = 0; i < clist->num; i++) {
3033                         ret = ctdb_sys_send_tcp(&clist->conn[i].src,
3034                                                 &clist->conn[i].dst,
3035                                                 0, 0, 0);
3036                         if (ret != 0) {
3037                                 num_failed += 1;
3038                         }
3039                 }
3040
3041                 TALLOC_FREE(clist);
3042
3043                 if (num_failed > 0) {
3044                         fprintf(stderr, "Failed to send %d tickles\n",
3045                                 num_failed);
3046                         return 1;
3047                 }
3048
3049                 return 0;
3050         }
3051
3052
3053         ret = ctdb_sock_addr_from_string(argv[0], &src, true);
3054         if (ret != 0) {
3055                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3056                 return 1;
3057         }
3058
3059         ret = ctdb_sock_addr_from_string(argv[1], &dst, true);
3060         if (ret != 0) {
3061                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3062                 return 1;
3063         }
3064
3065         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3066         if (ret != 0) {
3067                 fprintf(stderr, "Failed to send tickle ack\n");
3068                 return ret;
3069         }
3070
3071         return 0;
3072 }
3073
3074 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3075                               int argc, const char **argv)
3076 {
3077         ctdb_sock_addr addr;
3078         struct ctdb_tickle_list *tickles;
3079         unsigned port = 0;
3080         int ret, i;
3081
3082         if (argc < 1 || argc > 2) {
3083                 usage("gettickles");
3084         }
3085
3086         if (argc == 2) {
3087                 port = strtoul(argv[1], NULL, 10);
3088         }
3089
3090         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3091         if (ret != 0) {
3092                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3093                 return 1;
3094         }
3095         ctdb_sock_addr_set_port(&addr, port);
3096
3097         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3098                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3099                                             &tickles);
3100         if (ret != 0) {
3101                 fprintf(stderr, "Failed to get list of connections\n");
3102                 return ret;
3103         }
3104
3105         if (options.machinereadable) {
3106                 printf("%s%s%s%s%s%s%s%s%s\n",
3107                        options.sep,
3108                        "Source IP", options.sep,
3109                        "Port", options.sep,
3110                        "Destiation IP", options.sep,
3111                        "Port", options.sep);
3112                 for (i=0; i<tickles->num; i++) {
3113                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3114                                ctdb_sock_addr_to_string(
3115                                        mem_ctx, &tickles->conn[i].src, false),
3116                                options.sep,
3117                                ntohs(tickles->conn[i].src.ip.sin_port),
3118                                options.sep,
3119                                ctdb_sock_addr_to_string(
3120                                        mem_ctx, &tickles->conn[i].dst, false),
3121                                options.sep,
3122                                ntohs(tickles->conn[i].dst.ip.sin_port),
3123                                options.sep);
3124                 }
3125         } else {
3126                 printf("Connections for IP: %s\n",
3127                        ctdb_sock_addr_to_string(mem_ctx,
3128                                                 &tickles->addr, false));
3129                 printf("Num connections: %u\n", tickles->num);
3130                 for (i=0; i<tickles->num; i++) {
3131                         printf("SRC: %s   DST: %s\n",
3132                                ctdb_sock_addr_to_string(
3133                                        mem_ctx, &tickles->conn[i].src, true),
3134                                ctdb_sock_addr_to_string(
3135                                        mem_ctx, &tickles->conn[i].dst, true));
3136                 }
3137         }
3138
3139         talloc_free(tickles);
3140         return 0;
3141 }
3142
3143 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3144                                    struct ctdb_connection *conn);
3145
3146 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3147
3148 struct process_clist_state {
3149         struct ctdb_connection_list *clist;
3150         int count;
3151         int num_failed, num_total;
3152         clist_reply_func reply_func;
3153 };
3154
3155 static void process_clist_done(struct tevent_req *subreq);
3156
3157 static struct tevent_req *process_clist_send(
3158                                         TALLOC_CTX *mem_ctx,
3159                                         struct ctdb_context *ctdb,
3160                                         struct ctdb_connection_list *clist,
3161                                         clist_request_func request_func,
3162                                         clist_reply_func reply_func)
3163 {
3164         struct tevent_req *req, *subreq;
3165         struct process_clist_state *state;
3166         struct ctdb_req_control request;
3167         int i;
3168
3169         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3170         if (req == NULL) {
3171                 return NULL;
3172         }
3173
3174         state->clist = clist;
3175         state->reply_func = reply_func;
3176
3177         for (i = 0; i < clist->num; i++) {
3178                 request_func(&request, &clist->conn[i]);
3179                 subreq = ctdb_client_control_send(state, ctdb->ev,
3180                                                   ctdb->client, ctdb->cmd_pnn,
3181                                                   TIMEOUT(), &request);
3182                 if (tevent_req_nomem(subreq, req)) {
3183                         return tevent_req_post(req, ctdb->ev);
3184                 }
3185                 tevent_req_set_callback(subreq, process_clist_done, req);
3186         }
3187
3188         return req;
3189 }
3190
3191 static void process_clist_done(struct tevent_req *subreq)
3192 {
3193         struct tevent_req *req = tevent_req_callback_data(
3194                 subreq, struct tevent_req);
3195         struct process_clist_state *state = tevent_req_data(
3196                 req, struct process_clist_state);
3197         struct ctdb_reply_control *reply;
3198         int ret;
3199         bool status;
3200
3201         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3202         TALLOC_FREE(subreq);
3203         if (! status) {
3204                 state->num_failed += 1;
3205                 goto done;
3206         }
3207
3208         ret = state->reply_func(reply);
3209         if (ret != 0) {
3210                 state->num_failed += 1;
3211                 goto done;
3212         }
3213
3214 done:
3215         state->num_total += 1;
3216         if (state->num_total == state->clist->num) {
3217                 tevent_req_done(req);
3218         }
3219 }
3220
3221 static int process_clist_recv(struct tevent_req *req)
3222 {
3223         struct process_clist_state *state = tevent_req_data(
3224                 req, struct process_clist_state);
3225
3226         return state->num_failed;
3227 }
3228
3229 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3230                              int argc, const char **argv)
3231 {
3232         struct ctdb_connection conn;
3233         int ret;
3234
3235         if (argc != 0 && argc != 2) {
3236                 usage("addtickle");
3237         }
3238
3239         if (argc == 0) {
3240                 struct ctdb_connection_list *clist;
3241                 struct tevent_req *req;
3242
3243                 /* Client first but the src/dst logic is confused */
3244                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3245                 if (ret != 0) {
3246                         return ret;
3247                 }
3248                 if (clist->num == 0) {
3249                         return 0;
3250                 }
3251
3252                 req = process_clist_send(mem_ctx, ctdb, clist,
3253                                  ctdb_req_control_tcp_add_delayed_update,
3254                                  ctdb_reply_control_tcp_add_delayed_update);
3255                 if (req == NULL) {
3256                         talloc_free(clist);
3257                         return ENOMEM;
3258                 }
3259
3260                 tevent_req_poll(req, ctdb->ev);
3261                 talloc_free(clist);
3262
3263                 ret = process_clist_recv(req);
3264                 if (ret != 0) {
3265                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3266                         return 1;
3267                 }
3268
3269                 return 0;
3270         }
3271
3272         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3273         if (ret != 0) {
3274                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3275                 return 1;
3276         }
3277         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3278         if (ret != 0) {
3279                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3280                 return 1;
3281         }
3282
3283         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3284                                                ctdb->client, ctdb->cmd_pnn,
3285                                                TIMEOUT(), &conn);
3286         if (ret != 0) {
3287                 fprintf(stderr, "Failed to register connection\n");
3288                 return ret;
3289         }
3290
3291         return 0;
3292 }
3293
3294 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3295                              int argc, const char **argv)
3296 {
3297         struct ctdb_connection conn;
3298         int ret;
3299
3300         if (argc != 0 && argc != 2) {
3301                 usage("deltickle");
3302         }
3303
3304         if (argc == 0) {
3305                 struct ctdb_connection_list *clist;
3306                 struct tevent_req *req;
3307
3308                 /* Client first but the src/dst logic is confused */
3309                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3310                 if (ret != 0) {
3311                         return ret;
3312                 }
3313                 if (clist->num == 0) {
3314                         return 0;
3315                 }
3316
3317                 req = process_clist_send(mem_ctx, ctdb, clist,
3318                                          ctdb_req_control_tcp_remove,
3319                                          ctdb_reply_control_tcp_remove);
3320                 if (req == NULL) {
3321                         talloc_free(clist);
3322                         return ENOMEM;
3323                 }
3324
3325                 tevent_req_poll(req, ctdb->ev);
3326                 talloc_free(clist);
3327
3328                 ret = process_clist_recv(req);
3329                 if (ret != 0) {
3330                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3331                         return 1;
3332                 }
3333
3334                 return 0;
3335         }
3336
3337         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3338         if (ret != 0) {
3339                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3340                 return 1;
3341         }
3342         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3343         if (ret != 0) {
3344                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3345                 return 1;
3346         }
3347
3348         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3349                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3350         if (ret != 0) {
3351                 fprintf(stderr, "Failed to unregister connection\n");
3352                 return ret;
3353         }
3354
3355         return 0;
3356 }
3357
3358 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3359                              int argc, const char **argv)
3360 {
3361         struct ctdb_node_map *nodemap;
3362         int i;
3363
3364         if (argc != 0) {
3365                 usage("listnodes");
3366         }
3367
3368         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3369         if (nodemap == NULL) {
3370                 return 1;
3371         }
3372
3373         for (i=0; i<nodemap->num; i++) {
3374                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3375                         continue;
3376                 }
3377
3378                 if (options.machinereadable) {
3379                         printf("%s%u%s%s%s\n", options.sep,
3380                                nodemap->node[i].pnn, options.sep,
3381                                ctdb_sock_addr_to_string(
3382                                        mem_ctx, &nodemap->node[i].addr, false),
3383                                options.sep);
3384                 } else {
3385                         printf("%s\n",
3386                                ctdb_sock_addr_to_string(
3387                                        mem_ctx, &nodemap->node[i].addr, false));
3388                 }
3389         }
3390
3391         return 0;
3392 }
3393
3394 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3395                               struct ctdb_node_map *nodemap2)
3396 {
3397         int i;
3398
3399         if (nodemap1->num != nodemap2->num) {
3400                 return false;
3401         }
3402
3403         for (i=0; i<nodemap1->num; i++) {
3404                 struct ctdb_node_and_flags *n1, *n2;
3405
3406                 n1 = &nodemap1->node[i];
3407                 n2 = &nodemap2->node[i];
3408
3409                 if ((n1->pnn != n2->pnn) ||
3410                     (n1->flags != n2->flags) ||
3411                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3412                         return false;
3413                 }
3414         }
3415
3416         return true;
3417 }
3418
3419 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3420                                    struct ctdb_node_map *nm,
3421                                    struct ctdb_node_map *fnm,
3422                                    bool *reload)
3423 {
3424         int i;
3425         bool check_failed = false;
3426
3427         *reload = false;
3428
3429         for (i=0; i<nm->num; i++) {
3430                 if (i >= fnm->num) {
3431                         fprintf(stderr,
3432                                 "Node %u (%s) missing from nodes file\n",
3433                                 nm->node[i].pnn,
3434                                 ctdb_sock_addr_to_string(
3435                                         mem_ctx, &nm->node[i].addr, false));
3436                         check_failed = true;
3437                         continue;
3438                 }
3439                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3440                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3441                         /* Node remains deleted */
3442                         continue;
3443                 }
3444
3445                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3446                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3447                         /* Node not newly nor previously deleted */
3448                         if (! ctdb_same_ip(&nm->node[i].addr,
3449                                            &fnm->node[i].addr)) {
3450                                 fprintf(stderr,
3451                                         "Node %u has changed IP address"
3452                                         " (was %s, now %s)\n",
3453                                         nm->node[i].pnn,
3454                                         ctdb_sock_addr_to_string(
3455                                                 mem_ctx,
3456                                                 &nm->node[i].addr, false),
3457                                         ctdb_sock_addr_to_string(
3458                                                 mem_ctx,
3459                                                 &fnm->node[i].addr, false));
3460                                 check_failed = true;
3461                         } else {
3462                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3463                                         fprintf(stderr,
3464                                                 "WARNING: Node %u is disconnected."
3465                                                 " You MUST fix this node manually!\n",
3466                                                 nm->node[i].pnn);
3467                                 }
3468                         }
3469                         continue;
3470                 }
3471
3472                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3473                         /* Node is being deleted */
3474                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3475                         *reload = true;
3476                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3477                                 fprintf(stderr,
3478                                         "ERROR: Node %u is still connected\n",
3479                                         nm->node[i].pnn);
3480                                 check_failed = true;
3481                         }
3482                         continue;
3483                 }
3484
3485                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3486                         /* Node was previously deleted */
3487                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3488                         *reload = true;
3489                 }
3490         }
3491
3492         if (check_failed) {
3493                 fprintf(stderr,
3494                         "ERROR: Nodes will not be reloaded due to previous error\n");
3495                 return 1;
3496         }
3497
3498         /* Leftover nodes in file are NEW */
3499         for (; i < fnm->num; i++) {
3500                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3501                 *reload = true;
3502         }
3503
3504         return 0;
3505 }
3506
3507 struct disable_recoveries_state {
3508         uint32_t *pnn_list;
3509         int node_count;
3510         bool *reply;
3511         int status;
3512         bool done;
3513 };
3514
3515 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3516                                        void *private_data)
3517 {
3518         struct disable_recoveries_state *state =
3519                 (struct disable_recoveries_state *)private_data;
3520         int ret, i;
3521
3522         if (data.dsize != sizeof(int)) {
3523                 /* Ignore packet */
3524                 return;
3525         }
3526
3527         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3528         ret = *(int *)data.dptr;
3529         if (ret < 0) {
3530                 state->status = ret;
3531                 state->done = true;
3532                 return;
3533         }
3534         for (i=0; i<state->node_count; i++) {
3535                 if (state->pnn_list[i] == ret) {
3536                         state->reply[i] = true;
3537                         break;
3538                 }
3539         }
3540
3541         state->done = true;
3542         for (i=0; i<state->node_count; i++) {
3543                 if (! state->reply[i]) {
3544                         state->done = false;
3545                         break;
3546                 }
3547         }
3548 }
3549
3550 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3551                               uint32_t timeout, uint32_t *pnn_list, int count)
3552 {
3553         struct ctdb_disable_message disable = { 0 };
3554         struct disable_recoveries_state state;
3555         int ret, i;
3556
3557         disable.pnn = ctdb->pnn;
3558         disable.srvid = next_srvid(ctdb);
3559         disable.timeout = timeout;
3560
3561         state.pnn_list = pnn_list;
3562         state.node_count = count;
3563         state.done = false;
3564         state.status = 0;
3565         state.reply = talloc_zero_array(mem_ctx, bool, count);
3566         if (state.reply == NULL) {
3567                 return ENOMEM;
3568         }
3569
3570         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3571                                               disable.srvid,
3572                                               disable_recoveries_handler,
3573                                               &state);
3574         if (ret != 0) {
3575                 return ret;
3576         }
3577
3578         for (i=0; i<count; i++) {
3579                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3580                                                       ctdb->client,
3581                                                       pnn_list[i],
3582                                                       &disable);
3583                 if (ret != 0) {
3584                         goto fail;
3585                 }
3586         }
3587
3588         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3589         if (ret == ETIME) {
3590                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3591         } else {
3592                 ret = (state.status >= 0 ? 0 : 1);
3593         }
3594
3595 fail:
3596         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3597                                            disable.srvid, &state);
3598         return ret;
3599 }
3600
3601 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3602                                int argc, const char **argv)
3603 {
3604         struct ctdb_node_map *nodemap = NULL;
3605         struct ctdb_node_map *file_nodemap;
3606         struct ctdb_node_map *remote_nodemap;
3607         struct ctdb_req_control request;
3608         struct ctdb_reply_control **reply;
3609         bool reload;
3610         int ret, i;
3611         uint32_t *pnn_list;
3612         int count;
3613
3614         nodemap = get_nodemap(ctdb, false);
3615         if (nodemap == NULL) {
3616                 return 1;
3617         }
3618
3619         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3620         if (file_nodemap == NULL) {
3621                 return 1;
3622         }
3623
3624         for (i=0; i<nodemap->num; i++) {
3625                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3626                         continue;
3627                 }
3628
3629                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3630                                                nodemap->node[i].pnn, TIMEOUT(),
3631                                                &remote_nodemap);
3632                 if (ret != 0) {
3633                         fprintf(stderr,
3634                                 "ERROR: Failed to get nodes file from node %u\n",
3635                                 nodemap->node[i].pnn);
3636                         return ret;
3637                 }
3638
3639                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3640                         fprintf(stderr,
3641                                 "ERROR: Nodes file on node %u differs"
3642                                  " from current node (%u)\n",
3643                                  nodemap->node[i].pnn, ctdb->pnn);
3644                         return 1;
3645                 }
3646         }
3647
3648         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3649         if (ret != 0) {
3650                 return ret;
3651         }
3652
3653         if (! reload) {
3654                 fprintf(stderr, "No change in nodes file,"
3655                                 " skipping unnecessary reload\n");
3656                 return 0;
3657         }
3658
3659         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3660                                         mem_ctx, &pnn_list);
3661         if (count <= 0) {
3662                 fprintf(stderr, "Memory allocation error\n");
3663                 return 1;
3664         }
3665
3666         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3667                                  pnn_list, count);
3668         if (ret != 0) {
3669                 fprintf(stderr, "Failed to disable recoveries\n");
3670                 return ret;
3671         }
3672
3673         ctdb_req_control_reload_nodes_file(&request);
3674         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3675                                         pnn_list, count, TIMEOUT(),
3676                                         &request, NULL, &reply);
3677         if (ret != 0) {
3678                 bool failed = false;
3679
3680                 for (i=0; i<count; i++) {
3681                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3682                         if (ret != 0) {
3683                                 fprintf(stderr,
3684                                         "Node %u failed to reload nodes\n",
3685                                         pnn_list[i]);
3686                                 failed = true;
3687                         }
3688                 }
3689                 if (failed) {
3690                         fprintf(stderr,
3691                                 "You MUST fix failed nodes manually!\n");
3692                 }
3693         }
3694
3695         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3696         if (ret != 0) {
3697                 fprintf(stderr, "Failed to enable recoveries\n");
3698                 return ret;
3699         }
3700
3701         return 0;
3702 }
3703
3704 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3705                   ctdb_sock_addr *addr, uint32_t pnn)
3706 {
3707         struct ctdb_public_ip_list *pubip_list;
3708         struct ctdb_public_ip pubip;
3709         struct ctdb_node_map *nodemap;
3710         struct ctdb_req_control request;
3711         uint32_t *pnn_list;
3712         int ret, i, count;
3713
3714         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3715                                             CTDB_BROADCAST_CONNECTED,
3716                                             2*options.timelimit);
3717         if (ret != 0) {
3718                 fprintf(stderr, "Failed to disable IP check\n");
3719                 return ret;
3720         }
3721
3722         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3723                                        pnn, TIMEOUT(), false, &pubip_list);
3724         if (ret != 0) {
3725                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3726                         pnn);
3727                 return ret;
3728         }
3729
3730         for (i=0; i<pubip_list->num; i++) {
3731                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3732                         break;
3733                 }
3734         }
3735
3736         if (i == pubip_list->num) {
3737                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3738                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr, false));
3739                 return 1;
3740         }
3741
3742         nodemap = get_nodemap(ctdb, false);
3743         if (nodemap == NULL) {
3744                 return 1;
3745         }
3746
3747         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3748         if (count <= 0) {
3749                 fprintf(stderr, "Memory allocation error\n");
3750                 return 1;
3751         }
3752
3753         pubip.pnn = pnn;
3754         pubip.addr = *addr;
3755         ctdb_req_control_release_ip(&request, &pubip);
3756
3757         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3758                                         pnn_list, count, TIMEOUT(),
3759                                         &request, NULL, NULL);
3760         if (ret != 0) {
3761                 fprintf(stderr, "Failed to release IP on nodes\n");
3762                 return ret;
3763         }
3764
3765         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3766                                     pnn, TIMEOUT(), &pubip);
3767         if (ret != 0) {
3768                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3769                 return ret;
3770         }
3771
3772         return 0;
3773 }
3774
3775 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3776                           int argc, const char **argv)
3777 {
3778         ctdb_sock_addr addr;
3779         uint32_t pnn;
3780         int ret, retries = 0;
3781
3782         if (argc != 2) {
3783                 usage("moveip");
3784         }
3785
3786         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3787         if (ret != 0) {
3788                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3789                 return 1;
3790         }
3791
3792         pnn = strtoul(argv[1], NULL, 10);
3793         if (pnn == CTDB_UNKNOWN_PNN) {
3794                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3795                 return 1;
3796         }
3797
3798         while (retries < 5) {
3799                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3800                 if (ret == 0) {
3801                         break;
3802                 }
3803
3804                 sleep(3);
3805                 retries++;
3806         }
3807
3808         if (ret != 0) {
3809                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3810                         argv[0], pnn);
3811                 return ret;
3812         }
3813
3814         return 0;
3815 }
3816
3817 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3818                          uint32_t pnn)
3819 {
3820         int ret;
3821
3822         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3823                                           CTDB_BROADCAST_CONNECTED, pnn);
3824         if (ret != 0) {
3825                 fprintf(stderr,
3826                         "Failed to ask recovery master to distribute IPs\n");
3827                 return ret;
3828         }
3829
3830         return 0;
3831 }
3832
3833 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3834                          int argc, const char **argv)
3835 {
3836         ctdb_sock_addr addr;
3837         struct ctdb_public_ip_list *pubip_list;
3838         struct ctdb_addr_info addr_info;
3839         unsigned int mask;
3840         int ret, i, retries = 0;
3841
3842         if (argc != 2) {
3843                 usage("addip");
3844         }
3845
3846         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3847                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3848                 return 1;
3849         }
3850
3851         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3852                                        ctdb->cmd_pnn, TIMEOUT(),
3853                                        false, &pubip_list);
3854         if (ret != 0) {
3855                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3856                         ctdb->cmd_pnn);
3857                 return 1;
3858         }
3859
3860         for (i=0; i<pubip_list->num; i++) {
3861                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3862                         fprintf(stderr, "Node already knows about IP %s\n",
3863                                 ctdb_sock_addr_to_string(mem_ctx,
3864                                                          &addr, false));
3865                         return 0;
3866                 }
3867         }
3868
3869         addr_info.addr = addr;
3870         addr_info.mask = mask;
3871         addr_info.iface = argv[1];
3872
3873         while (retries < 5) {
3874                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3875                                               ctdb->cmd_pnn, TIMEOUT(),
3876                                               &addr_info);
3877                 if (ret == 0) {
3878                         break;
3879                 }
3880
3881                 sleep(3);
3882                 retries++;
3883         }
3884
3885         if (ret != 0) {
3886                 fprintf(stderr, "Failed to add public IP to node %u."
3887                                 " Giving up\n", ctdb->cmd_pnn);
3888                 return ret;
3889         }
3890
3891         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3892         if (ret != 0) {
3893                 return ret;
3894         }
3895
3896         return 0;
3897 }
3898
3899 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3900                          int argc, const char **argv)
3901 {
3902         ctdb_sock_addr addr;
3903         struct ctdb_public_ip_list *pubip_list;
3904         struct ctdb_addr_info addr_info;
3905         int ret, i;
3906
3907         if (argc != 1) {
3908                 usage("delip");
3909         }
3910
3911         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3912         if (ret != 0) {
3913                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3914                 return 1;
3915         }
3916
3917         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3918                                        ctdb->cmd_pnn, TIMEOUT(),
3919                                        false, &pubip_list);
3920         if (ret != 0) {
3921                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3922                         ctdb->cmd_pnn);
3923                 return 1;
3924         }
3925
3926         for (i=0; i<pubip_list->num; i++) {
3927                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3928                         break;
3929                 }
3930         }
3931
3932         if (i == pubip_list->num) {
3933                 fprintf(stderr, "Node does not know about IP address %s\n",
3934                         ctdb_sock_addr_to_string(mem_ctx, &addr, false));
3935                 return 0;
3936         }
3937
3938         addr_info.addr = addr;
3939         addr_info.mask = 0;
3940         addr_info.iface = NULL;
3941
3942         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3943                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3944         if (ret != 0) {
3945                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3946                         ctdb->cmd_pnn);
3947                 return ret;
3948         }
3949
3950         return 0;
3951 }
3952
3953 #define DB_VERSION      3
3954 #define MAX_DB_NAME     64
3955 #define MAX_REC_BUFFER_SIZE     (100*1000)
3956
3957 struct db_header {
3958         unsigned long version;
3959         time_t timestamp;
3960         unsigned long flags;
3961         unsigned long nbuf;
3962         unsigned long nrec;
3963         char name[MAX_DB_NAME];
3964 };
3965
3966 struct backup_state {
3967         TALLOC_CTX *mem_ctx;
3968         struct ctdb_rec_buffer *recbuf;
3969         uint32_t db_id;
3970         int fd;
3971         unsigned int nbuf, nrec;
3972 };
3973
3974 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
3975                           TDB_DATA key, TDB_DATA data, void *private_data)
3976 {
3977         struct backup_state *state = (struct backup_state *)private_data;
3978         size_t len;
3979         int ret;
3980
3981         if (state->recbuf == NULL) {
3982                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
3983                                                      state->db_id);
3984                 if (state->recbuf == NULL) {
3985                         return ENOMEM;
3986                 }
3987         }
3988
3989         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
3990                                   header, key, data);
3991         if (ret != 0) {
3992                 return ret;
3993         }
3994
3995         len = ctdb_rec_buffer_len(state->recbuf);
3996         if (len < MAX_REC_BUFFER_SIZE) {
3997                 return 0;
3998         }
3999
4000         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4001         if (ret != 0) {
4002                 fprintf(stderr, "Failed to write records to backup file\n");
4003                 return ret;
4004         }
4005
4006         state->nbuf += 1;
4007         state->nrec += state->recbuf->count;
4008         TALLOC_FREE(state->recbuf);
4009
4010         return 0;
4011 }
4012
4013 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4014                             int argc, const char **argv)
4015 {
4016         const char *db_name;
4017         struct ctdb_db_context *db;
4018         uint32_t db_id;
4019         uint8_t db_flags;
4020         struct backup_state state;
4021         struct db_header db_hdr;
4022         int fd, ret;
4023
4024         if (argc != 2) {
4025                 usage("backupdb");
4026         }
4027
4028         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4029                 return 1;
4030         }
4031
4032         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4033                           db_flags, &db);
4034         if (ret != 0) {
4035                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4036                 return ret;
4037         }
4038
4039         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4040         if (fd == -1) {
4041                 ret = errno;
4042                 fprintf(stderr, "Failed to open file %s for writing\n",
4043                         argv[1]);
4044                 return ret;
4045         }
4046
4047         /* Write empty header first */
4048         ZERO_STRUCT(db_hdr);
4049         ret = write(fd, &db_hdr, sizeof(struct db_header));
4050         if (ret == -1) {
4051                 ret = errno;
4052                 close(fd);
4053                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4054                 return ret;
4055         }
4056
4057         state.mem_ctx = mem_ctx;
4058         state.recbuf = NULL;
4059         state.fd = fd;
4060         state.nbuf = 0;
4061         state.nrec = 0;
4062
4063         ret = ctdb_db_traverse_local(db, true, false, backup_handler, &state);
4064         if (ret != 0) {
4065                 fprintf(stderr, "Failed to collect records from DB %s\n",
4066                         db_name);
4067                 close(fd);
4068                 return ret;
4069         }
4070
4071         if (state.recbuf != NULL) {
4072                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4073                 if (ret != 0) {
4074                         fprintf(stderr,
4075                                 "Failed to write records to backup file\n");
4076                         close(fd);
4077                         return ret;
4078                 }
4079
4080                 state.nbuf += 1;
4081                 state.nrec += state.recbuf->count;
4082                 TALLOC_FREE(state.recbuf);
4083         }
4084
4085         db_hdr.version = DB_VERSION;
4086         db_hdr.timestamp = time(NULL);
4087         db_hdr.flags = db_flags;
4088         db_hdr.nbuf = state.nbuf;
4089         db_hdr.nrec = state.nrec;
4090         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4091
4092         lseek(fd, 0, SEEK_SET);
4093         ret = write(fd, &db_hdr, sizeof(struct db_header));
4094         if (ret == -1) {
4095                 ret = errno;
4096                 close(fd);
4097                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4098                 return ret;
4099         }
4100
4101         close(fd);
4102         printf("Database backed up to %s\n", argv[1]);
4103         return 0;
4104 }
4105
4106 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4107                              int argc, const char **argv)
4108 {
4109         const char *db_name = NULL;
4110         struct ctdb_db_context *db;
4111         struct db_header db_hdr;
4112         struct ctdb_node_map *nodemap;
4113         struct ctdb_req_control request;
4114         struct ctdb_reply_control **reply;
4115         struct ctdb_transdb wipedb;
4116         struct ctdb_pulldb_ext pulldb;
4117         struct ctdb_rec_buffer *recbuf;
4118         uint32_t generation;
4119         uint32_t *pnn_list;
4120         char timebuf[128];
4121         ssize_t n;
4122         int fd, i;
4123         int count, ret;
4124         uint8_t db_flags;
4125
4126         if (argc < 1 || argc > 2) {
4127                 usage("restoredb");
4128         }
4129
4130         fd = open(argv[0], O_RDONLY, 0600);
4131         if (fd == -1) {
4132                 ret = errno;
4133                 fprintf(stderr, "Failed to open file %s for reading\n",
4134                         argv[0]);
4135                 return ret;
4136         }
4137
4138         if (argc == 2) {
4139                 db_name = argv[1];
4140         }
4141
4142         n = read(fd, &db_hdr, sizeof(struct db_header));
4143         if (n == -1) {
4144                 ret = errno;
4145                 close(fd);
4146                 fprintf(stderr, "Failed to read db header from file %s\n",
4147                         argv[0]);
4148                 return ret;
4149         }
4150         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4151
4152         if (db_hdr.version != DB_VERSION) {
4153                 fprintf(stderr,
4154                         "Wrong version of backup file, expected %u, got %lu\n",
4155                         DB_VERSION, db_hdr.version);
4156                 close(fd);
4157                 return EINVAL;
4158         }
4159
4160         if (db_name == NULL) {
4161                 db_name = db_hdr.name;
4162         }
4163
4164         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4165                  localtime(&db_hdr.timestamp));
4166         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4167
4168         db_flags = db_hdr.flags & 0xff;
4169         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4170                           db_flags, &db);
4171         if (ret != 0) {
4172                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4173                 close(fd);
4174                 return ret;
4175         }
4176
4177         nodemap = get_nodemap(ctdb, false);
4178         if (nodemap == NULL) {
4179                 fprintf(stderr, "Failed to get nodemap\n");
4180                 close(fd);
4181                 return ENOMEM;
4182         }
4183
4184         ret = get_generation(mem_ctx, ctdb, &generation);
4185         if (ret != 0) {
4186                 fprintf(stderr, "Failed to get current generation\n");
4187                 close(fd);
4188                 return ret;
4189         }
4190
4191         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4192                                      &pnn_list);
4193         if (count <= 0) {
4194                 close(fd);
4195                 return ENOMEM;
4196         }
4197
4198         wipedb.db_id = ctdb_db_id(db);
4199         wipedb.tid = generation;
4200
4201         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4202         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4203                                         ctdb->client, pnn_list, count,
4204                                         TIMEOUT(), &request, NULL, NULL);
4205         if (ret != 0) {
4206                 goto failed;
4207         }
4208
4209
4210         ctdb_req_control_db_transaction_start(&request, &wipedb);
4211         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4212                                         pnn_list, count, TIMEOUT(),
4213                                         &request, NULL, NULL);
4214         if (ret != 0) {
4215                 goto failed;
4216         }
4217
4218         ctdb_req_control_wipe_database(&request, &wipedb);
4219         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4220                                         pnn_list, count, TIMEOUT(),
4221                                         &request, NULL, NULL);
4222         if (ret != 0) {
4223                 goto failed;
4224         }
4225
4226         pulldb.db_id = ctdb_db_id(db);
4227         pulldb.lmaster = 0;
4228         pulldb.srvid = SRVID_CTDB_PUSHDB;
4229
4230         ctdb_req_control_db_push_start(&request, &pulldb);
4231         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4232                                         pnn_list, count, TIMEOUT(),
4233                                         &request, NULL, NULL);
4234         if (ret != 0) {
4235                 goto failed;
4236         }
4237
4238         for (i=0; i<db_hdr.nbuf; i++) {
4239                 struct ctdb_req_message message;
4240                 TDB_DATA data;
4241                 size_t np;
4242
4243                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4244                 if (ret != 0) {
4245                         goto failed;
4246                 }
4247
4248                 data.dsize = ctdb_rec_buffer_len(recbuf);
4249                 data.dptr = talloc_size(mem_ctx, data.dsize);
4250                 if (data.dptr == NULL) {
4251                         goto failed;
4252                 }
4253
4254                 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
4255
4256                 message.srvid = pulldb.srvid;
4257                 message.data.data = data;
4258
4259                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4260                                                 ctdb->client,
4261                                                 pnn_list, count,
4262                                                 &message, NULL);
4263                 if (ret != 0) {
4264                         goto failed;
4265                 }
4266
4267                 talloc_free(recbuf);
4268                 talloc_free(data.dptr);
4269         }
4270
4271         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4272         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4273                                         pnn_list, count, TIMEOUT(),
4274                                         &request, NULL, &reply);
4275         if (ret != 0) {
4276                 goto failed;
4277         }
4278
4279         for (i=0; i<count; i++) {
4280                 uint32_t num_records;
4281
4282                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4283                                                          &num_records);
4284                 if (ret != 0) {
4285                         fprintf(stderr, "Invalid response from node %u\n",
4286                                 pnn_list[i]);
4287                         goto failed;
4288                 }
4289
4290                 if (num_records != db_hdr.nrec) {
4291                         fprintf(stderr, "Node %u received %u of %lu records\n",
4292                                 pnn_list[i], num_records, db_hdr.nrec);
4293                         goto failed;
4294                 }
4295         }
4296
4297         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4298         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4299                                         pnn_list, count, TIMEOUT(),
4300                                         &request, NULL, NULL);
4301         if (ret != 0) {
4302                 goto failed;
4303         }
4304
4305         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4306         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4307                                         pnn_list, count, TIMEOUT(),
4308                                         &request, NULL, NULL);
4309         if (ret != 0) {
4310                 goto failed;
4311         }
4312
4313         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4314         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4315                                         ctdb->client, pnn_list, count,
4316                                         TIMEOUT(), &request, NULL, NULL);
4317         if (ret != 0) {
4318                 goto failed;
4319         }
4320
4321         printf("Database %s restored\n", db_name);
4322         close(fd);
4323         return 0;
4324
4325
4326 failed:
4327         close(fd);
4328         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4329                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4330         return ret;
4331 }
4332
4333 struct dumpdbbackup_state {
4334         ctdb_rec_parser_func_t parser;
4335         struct dump_record_state sub_state;
4336 };
4337
4338 static int dumpdbbackup_handler(uint32_t reqid,
4339                                 struct ctdb_ltdb_header *header,
4340                                 TDB_DATA key, TDB_DATA data,
4341                                 void *private_data)
4342 {
4343         struct dumpdbbackup_state *state =
4344                 (struct dumpdbbackup_state *)private_data;
4345         struct ctdb_ltdb_header hdr;
4346         int ret;
4347
4348         ret = ctdb_ltdb_header_extract(&data, &hdr);
4349         if (ret != 0) {
4350                 return ret;
4351         }
4352
4353         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4354 }
4355
4356 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4357                                 int argc, const char **argv)
4358 {
4359         struct db_header db_hdr;
4360         char timebuf[128];
4361         struct dumpdbbackup_state state;
4362         ssize_t n;
4363         int fd, ret, i;
4364
4365         if (argc != 1) {
4366                 usage("dumpbackup");
4367         }
4368
4369         fd = open(argv[0], O_RDONLY, 0600);
4370         if (fd == -1) {
4371                 ret = errno;
4372                 fprintf(stderr, "Failed to open file %s for reading\n",
4373                         argv[0]);
4374                 return ret;
4375         }
4376
4377         n = read(fd, &db_hdr, sizeof(struct db_header));
4378         if (n == -1) {
4379                 ret = errno;
4380                 close(fd);
4381                 fprintf(stderr, "Failed to read db header from file %s\n",
4382                         argv[0]);
4383                 return ret;
4384         }
4385         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4386
4387         if (db_hdr.version != DB_VERSION) {
4388                 fprintf(stderr,
4389                         "Wrong version of backup file, expected %u, got %lu\n",
4390                         DB_VERSION, db_hdr.version);
4391                 close(fd);
4392                 return EINVAL;
4393         }
4394
4395         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4396                  localtime(&db_hdr.timestamp));
4397         printf("Dumping database %s from backup @ %s\n",
4398                db_hdr.name, timebuf);
4399
4400         state.parser = dump_record;
4401         state.sub_state.count = 0;
4402
4403         for (i=0; i<db_hdr.nbuf; i++) {
4404                 struct ctdb_rec_buffer *recbuf;
4405
4406                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4407                 if (ret != 0) {
4408                         fprintf(stderr, "Failed to read records\n");
4409                         close(fd);
4410                         return ret;
4411                 }
4412
4413                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4414                                                &state);
4415                 if (ret != 0) {
4416                         fprintf(stderr, "Failed to dump records\n");
4417                         close(fd);
4418                         return ret;
4419                 }
4420         }
4421
4422         close(fd);
4423         printf("Dumped %u record(s)\n", state.sub_state.count);
4424         return 0;
4425 }
4426
4427 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4428                           int argc, const char **argv)
4429 {
4430         const char *db_name;
4431         struct ctdb_db_context *db;
4432         uint32_t db_id;
4433         uint8_t db_flags;
4434         struct ctdb_node_map *nodemap;
4435         struct ctdb_req_control request;
4436         struct ctdb_transdb wipedb;
4437         uint32_t generation;
4438         uint32_t *pnn_list;
4439         int count, ret;
4440
4441         if (argc != 1) {
4442                 usage("wipedb");
4443         }
4444
4445         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4446                 return 1;
4447         }
4448
4449         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4450                           db_flags, &db);
4451         if (ret != 0) {
4452                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4453                 return ret;
4454         }
4455
4456         nodemap = get_nodemap(ctdb, false);
4457         if (nodemap == NULL) {
4458                 fprintf(stderr, "Failed to get nodemap\n");
4459                 return ENOMEM;
4460         }
4461
4462         ret = get_generation(mem_ctx, ctdb, &generation);
4463         if (ret != 0) {
4464                 fprintf(stderr, "Failed to get current generation\n");
4465                 return ret;
4466         }
4467
4468         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4469                                      &pnn_list);
4470         if (count <= 0) {
4471                 return ENOMEM;
4472         }
4473
4474         ctdb_req_control_db_freeze(&request, db_id);
4475         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4476                                         ctdb->client, pnn_list, count,
4477                                         TIMEOUT(), &request, NULL, NULL);
4478         if (ret != 0) {
4479                 goto failed;
4480         }
4481
4482         wipedb.db_id = db_id;
4483         wipedb.tid = generation;
4484
4485         ctdb_req_control_db_transaction_start(&request, &wipedb);
4486         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4487                                         pnn_list, count, TIMEOUT(),
4488                                         &request, NULL, NULL);
4489         if (ret != 0) {
4490                 goto failed;
4491         }
4492
4493         ctdb_req_control_wipe_database(&request, &wipedb);
4494         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4495                                         pnn_list, count, TIMEOUT(),
4496                                         &request, NULL, NULL);
4497         if (ret != 0) {
4498                 goto failed;
4499         }
4500
4501         ctdb_req_control_db_set_healthy(&request, db_id);
4502         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4503                                         pnn_list, count, TIMEOUT(),
4504                                         &request, NULL, NULL);
4505         if (ret != 0) {
4506                 goto failed;
4507         }
4508
4509         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4510         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4511                                         pnn_list, count, TIMEOUT(),
4512                                         &request, NULL, NULL);
4513         if (ret != 0) {
4514                 goto failed;
4515         }
4516
4517         ctdb_req_control_db_thaw(&request, db_id);
4518         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4519                                         ctdb->client, pnn_list, count,
4520                                         TIMEOUT(), &request, NULL, NULL);
4521         if (ret != 0) {
4522                 goto failed;
4523         }
4524
4525         printf("Database %s wiped\n", db_name);
4526         return 0;
4527
4528
4529 failed:
4530         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4531                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4532         return ret;
4533 }
4534
4535 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4536                              int argc, const char **argv)
4537 {
4538         uint32_t recmaster;
4539         int ret;
4540
4541         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4542                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4543         if (ret != 0) {
4544                 return ret;
4545         }
4546
4547         printf("%u\n", recmaster);
4548         return 0;
4549 }
4550
4551 static int control_event(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4552                          int argc, const char **argv)
4553 {
4554         char *t, *event_helper = NULL;
4555         char *eventd_socket = NULL;
4556         const char **new_argv;
4557         int i;
4558
4559         t = getenv("CTDB_EVENT_HELPER");
4560         if (t != NULL) {
4561                 event_helper = talloc_strdup(mem_ctx, t);
4562         } else {
4563                 event_helper = talloc_asprintf(mem_ctx, "%s/ctdb_event",
4564                                                CTDB_HELPER_BINDIR);
4565         }
4566
4567         if (event_helper == NULL) {
4568                 fprintf(stderr, "Unable to set event daemon helper\n");
4569                 return 1;
4570         }
4571
4572         t = getenv("CTDB_SOCKET");
4573         if (t != NULL) {
4574                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4575                                                 dirname(t));
4576         } else {
4577                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4578                                                 CTDB_RUNDIR);
4579         }
4580
4581         if (eventd_socket == NULL) {
4582                 fprintf(stderr, "Unable to set event daemon socket\n");
4583                 return 1;
4584         }
4585
4586         new_argv = talloc_array(mem_ctx, const char *, argc + 1);
4587         if (new_argv == NULL) {
4588                 fprintf(stderr, "Memory allocation error\n");
4589                 return 1;
4590         }
4591
4592         new_argv[0] = eventd_socket;
4593         for (i=0; i<argc; i++) {
4594                 new_argv[i+1] = argv[i];
4595         }
4596
4597         return run_helper(mem_ctx, "event daemon helper", event_helper,
4598                           argc+1, new_argv);
4599 }
4600
4601 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4602                                 int argc, const char **argv)
4603 {
4604         const char *new_argv[3];
4605
4606         if (argc > 1) {
4607                 usage("scriptstatus");
4608         }
4609
4610         new_argv[0] = "status";
4611         new_argv[1] = (argc == 0) ? "monitor" : argv[0];
4612         new_argv[2] = NULL;
4613
4614         (void) control_event(mem_ctx, ctdb, 2, new_argv);
4615         return 0;
4616 }
4617
4618 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4619                          int argc, const char **argv)
4620 {
4621         char *t, *natgw_helper = NULL;
4622
4623         if (argc != 1) {
4624                 usage("natgw");
4625         }
4626
4627         t = getenv("CTDB_NATGW_HELPER");
4628         if (t != NULL) {
4629                 natgw_helper = talloc_strdup(mem_ctx, t);
4630         } else {
4631                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4632                                                CTDB_HELPER_BINDIR);
4633         }
4634
4635         if (natgw_helper == NULL) {
4636                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4637                 return 1;
4638         }
4639
4640         return run_helper(mem_ctx, "NAT gateway helper", natgw_helper,
4641                           argc, argv);
4642 }
4643
4644 /*
4645  * Find the PNN of the current node
4646  * discover the pnn by loading the nodes file and try to bind
4647  * to all addresses one at a time until the ip address is found.
4648  */
4649 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4650 {
4651         struct ctdb_node_map *nodemap;
4652         int i;
4653
4654         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4655         if (nodemap == NULL) {
4656                 return false;
4657         }
4658
4659         for (i=0; i<nodemap->num; i++) {
4660                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4661                         continue;
4662                 }
4663                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4664                         if (pnn != NULL) {
4665                                 *pnn = nodemap->node[i].pnn;
4666                         }
4667                         talloc_free(nodemap);
4668                         return true;
4669                 }
4670         }
4671
4672         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4673         talloc_free(nodemap);
4674         return false;
4675 }
4676
4677 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4678                               int argc, const char **argv)
4679 {
4680         const char *reclock;
4681         int ret;
4682
4683         if (argc != 0) {
4684                 usage("getreclock");
4685         }
4686
4687         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4688                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4689         if (ret != 0) {
4690                 return ret;
4691         }
4692
4693         if (reclock != NULL) {
4694                 printf("%s\n", reclock);
4695         }
4696
4697         return 0;
4698 }
4699
4700 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4701                                   struct ctdb_context *ctdb,
4702                                   int argc, const char **argv)
4703 {
4704         uint32_t lmasterrole = 0;
4705         int ret;
4706
4707         if (argc != 1) {
4708                 usage("setlmasterrole");
4709         }
4710
4711         if (strcmp(argv[0], "on") == 0) {
4712                 lmasterrole = 1;
4713         } else if (strcmp(argv[0], "off") == 0) {
4714                 lmasterrole = 0;
4715         } else {
4716                 usage("setlmasterrole");
4717         }
4718
4719         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4720                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4721         if (ret != 0) {
4722                 return ret;
4723         }
4724
4725         return 0;
4726 }
4727
4728 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4729                                     struct ctdb_context *ctdb,
4730                                     int argc, const char **argv)
4731 {
4732         uint32_t recmasterrole = 0;
4733         int ret;
4734
4735         if (argc != 1) {
4736                 usage("setrecmasterrole");
4737         }
4738
4739         if (strcmp(argv[0], "on") == 0) {
4740                 recmasterrole = 1;
4741         } else if (strcmp(argv[0], "off") == 0) {
4742                 recmasterrole = 0;
4743         } else {
4744                 usage("setrecmasterrole");
4745         }
4746
4747         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4748                                           ctdb->cmd_pnn, TIMEOUT(),
4749                                           recmasterrole);
4750         if (ret != 0) {
4751                 return ret;
4752         }
4753
4754         return 0;
4755 }
4756
4757 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4758                                  struct ctdb_context *ctdb,
4759                                  int argc, const char **argv)
4760 {
4761         uint32_t db_id;
4762         uint8_t db_flags;
4763         int ret;
4764
4765         if (argc != 1) {
4766                 usage("setdbreadonly");
4767         }
4768
4769         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4770                 return 1;
4771         }
4772
4773         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4774                 fprintf(stderr, "READONLY can be set only on volatile DB\n");
4775                 return 1;
4776         }
4777
4778         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
4779                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
4780         if (ret != 0) {
4781                 return ret;
4782         }
4783
4784         return 0;
4785 }
4786
4787 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4788                                int argc, const char **argv)
4789 {
4790         uint32_t db_id;
4791         uint8_t db_flags;
4792         int ret;
4793
4794         if (argc != 1) {
4795                 usage("setdbsticky");
4796         }
4797
4798         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4799                 return 1;
4800         }
4801
4802         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4803                 fprintf(stderr, "STICKY can be set only on volatile DB\n");
4804                 return 1;
4805         }
4806
4807         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
4808                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
4809         if (ret != 0) {
4810                 return ret;
4811         }
4812
4813         return 0;
4814 }
4815
4816 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4817                           int argc, const char **argv)
4818 {
4819         const char *db_name;
4820         struct ctdb_db_context *db;
4821         struct ctdb_transaction_handle *h;
4822         uint8_t db_flags;
4823         TDB_DATA key, data;
4824         int ret;
4825
4826         if (argc < 2 || argc > 3) {
4827                 usage("pfetch");
4828         }
4829
4830         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4831                 return 1;
4832         }
4833
4834         if (! (db_flags &
4835                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4836                 fprintf(stderr, "Transactions not supported on DB %s\n",
4837                         db_name);
4838                 return 1;
4839         }
4840
4841         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4842                           db_flags, &db);
4843         if (ret != 0) {
4844                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4845                 return ret;
4846         }
4847
4848         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4849         if (ret != 0) {
4850                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4851                 return ret;
4852         }
4853
4854         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4855                                      TIMEOUT(), db, true, &h);
4856         if (ret != 0) {
4857                 fprintf(stderr, "Failed to start transaction on db %s\n",
4858                         db_name);
4859                 return ret;
4860         }
4861
4862         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
4863         if (ret != 0) {
4864                 fprintf(stderr, "Failed to read record for key %s\n",
4865                         argv[1]);
4866                 ctdb_transaction_cancel(h);
4867                 return ret;
4868         }
4869
4870         printf("%.*s\n", (int)data.dsize, data.dptr);
4871
4872         ctdb_transaction_cancel(h);
4873         return 0;
4874 }
4875
4876 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4877                           int argc, const char **argv)
4878 {
4879         const char *db_name;
4880         struct ctdb_db_context *db;
4881         struct ctdb_transaction_handle *h;
4882         uint8_t db_flags;
4883         TDB_DATA key, data;
4884         int ret;
4885
4886         if (argc != 3) {
4887                 usage("pstore");
4888         }
4889
4890         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4891                 return 1;
4892         }
4893
4894         if (! (db_flags &
4895                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4896                 fprintf(stderr, "Transactions not supported on DB %s\n",
4897                         db_name);
4898                 return 1;
4899         }
4900
4901         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4902                           db_flags, &db);
4903         if (ret != 0) {
4904                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4905                 return ret;
4906         }
4907
4908         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4909         if (ret != 0) {
4910                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4911                 return ret;
4912         }
4913
4914         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
4915         if (ret != 0) {
4916                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
4917                 return ret;
4918         }
4919
4920         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4921                                      TIMEOUT(), db, false, &h);
4922         if (ret != 0) {
4923                 fprintf(stderr, "Failed to start transaction on db %s\n",
4924                         db_name);
4925                 return ret;
4926         }
4927
4928         ret = ctdb_transaction_store_record(h, key, data);
4929         if (ret != 0) {
4930                 fprintf(stderr, "Failed to store record for key %s\n",
4931                         argv[1]);
4932                 ctdb_transaction_cancel(h);
4933                 return ret;
4934         }
4935
4936         ret = ctdb_transaction_commit(h);
4937         if (ret != 0) {
4938                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4939                         db_name);
4940                 ctdb_transaction_cancel(h);
4941                 return ret;
4942         }
4943
4944         return 0;
4945 }
4946
4947 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4948                            int argc, const char **argv)
4949 {
4950         const char *db_name;
4951         struct ctdb_db_context *db;
4952         struct ctdb_transaction_handle *h;
4953         uint8_t db_flags;
4954         TDB_DATA key;
4955         int ret;
4956
4957         if (argc != 2) {
4958                 usage("pdelete");
4959         }
4960
4961         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4962                 return 1;
4963         }
4964
4965         if (! (db_flags &
4966                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4967                 fprintf(stderr, "Transactions not supported on DB %s\n",
4968                         db_name);
4969                 return 1;
4970         }
4971
4972         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4973                           db_flags, &db);
4974         if (ret != 0) {
4975                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4976                 return ret;
4977         }
4978
4979         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4980         if (ret != 0) {
4981                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4982                 return ret;
4983         }
4984
4985         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4986                                      TIMEOUT(), db, false, &h);
4987         if (ret != 0) {
4988                 fprintf(stderr, "Failed to start transaction on db %s\n",
4989                         db_name);
4990                 return ret;
4991         }
4992
4993         ret = ctdb_transaction_delete_record(h, key);
4994         if (ret != 0) {
4995                 fprintf(stderr, "Failed to delete record for key %s\n",
4996                         argv[1]);
4997                 ctdb_transaction_cancel(h);
4998                 return ret;
4999         }
5000
5001         ret = ctdb_transaction_commit(h);
5002         if (ret != 0) {
5003                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5004                         db_name);
5005                 ctdb_transaction_cancel(h);
5006                 return ret;
5007         }
5008
5009         return 0;
5010 }
5011
5012 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5013 {
5014         const char *t;
5015         size_t n;
5016         int ret;
5017
5018         *data = tdb_null;
5019
5020         /* Skip whitespace */
5021         n = strspn(*ptr, " \t");
5022         t = *ptr + n;
5023
5024         if (t[0] == '"') {
5025                 /* Quoted ASCII string - no wide characters! */
5026                 t++;
5027                 n = strcspn(t, "\"");
5028                 if (t[n] == '"') {
5029                         if (n > 0) {
5030                                 ret = str_to_data(t, n, mem_ctx, data);
5031                                 if (ret != 0) {
5032                                         return ret;
5033                                 }
5034                         }
5035                         *ptr = t + n + 1;
5036                 } else {
5037                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5038                         return 1;
5039                 }
5040         } else {
5041                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5042                 return 1;
5043         }
5044
5045         return 0;
5046 }
5047
5048 #define MAX_LINE_SIZE   1024
5049
5050 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5051                                  TDB_DATA *key, TDB_DATA *value)
5052 {
5053         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5054         const char *ptr;
5055         int ret;
5056
5057         ptr = fgets(line, MAX_LINE_SIZE, file);
5058         if (ptr == NULL) {
5059                 return false;
5060         }
5061
5062         /* Get key */
5063         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5064         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5065                 /* Line Ignored but not EOF */
5066                 *key = tdb_null;
5067                 return true;
5068         }
5069
5070         /* Get value */
5071         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5072         if (ret != 0) {
5073                 /* Line Ignored but not EOF */
5074                 talloc_free(key->dptr);
5075                 *key = tdb_null;
5076                 return true;
5077         }
5078
5079         return true;
5080 }
5081
5082 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5083                           int argc, const char **argv)
5084 {
5085         const char *db_name;
5086         struct ctdb_db_context *db;
5087         struct ctdb_transaction_handle *h;
5088         uint8_t db_flags;
5089         FILE *file;
5090         TDB_DATA key, value;
5091         int ret;
5092
5093         if (argc < 1 || argc > 2) {
5094                 usage("ptrans");
5095         }
5096
5097         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5098                 return 1;
5099         }
5100
5101         if (! (db_flags &
5102                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
5103                 fprintf(stderr, "Transactions not supported on DB %s\n",
5104                         db_name);
5105                 return 1;
5106         }
5107
5108         if (argc == 2) {
5109                 file = fopen(argv[1], "r");
5110                 if (file == NULL) {
5111                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5112                         return 1;
5113                 }
5114         } else {
5115                 file = stdin;
5116         }
5117
5118         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5119                           db_flags, &db);
5120         if (ret != 0) {
5121                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5122                 goto done;
5123         }
5124
5125         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5126                                      TIMEOUT(), db, false, &h);
5127         if (ret != 0) {
5128                 fprintf(stderr, "Failed to start transaction on db %s\n",
5129                         db_name);
5130                 goto done;
5131         }
5132
5133         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5134                 if (key.dsize != 0) {
5135                         ret = ctdb_transaction_store_record(h, key, value);
5136                         if (ret != 0) {
5137                                 fprintf(stderr, "Failed to store record\n");
5138                                 ctdb_transaction_cancel(h);
5139                                 goto done;
5140                         }
5141                         talloc_free(key.dptr);
5142                         talloc_free(value.dptr);
5143                 }
5144         }
5145
5146         ret = ctdb_transaction_commit(h);
5147         if (ret != 0) {
5148                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5149                         db_name);
5150                 ctdb_transaction_cancel(h);
5151         }
5152
5153 done:
5154         if (file != stdin) {
5155                 fclose(file);
5156         }
5157         return ret;
5158 }
5159
5160 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5161                           int argc, const char **argv)
5162 {
5163         struct tdb_context *tdb;
5164         TDB_DATA key, data;
5165         struct ctdb_ltdb_header header;
5166         int ret;
5167
5168         if (argc < 2 || argc > 3) {
5169                 usage("tfetch");
5170         }
5171
5172         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5173         if (tdb == NULL) {
5174                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5175                 return 1;
5176         }
5177
5178         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5179         if (ret != 0) {
5180                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5181                 tdb_close(tdb);
5182                 return ret;
5183         }
5184
5185         data = tdb_fetch(tdb, key);
5186         if (data.dptr == NULL) {
5187                 fprintf(stderr, "No record for key %s\n", argv[1]);
5188                 tdb_close(tdb);
5189                 return 1;
5190         }
5191
5192         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5193                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5194                 tdb_close(tdb);
5195                 return 1;
5196         }
5197
5198         tdb_close(tdb);
5199
5200         if (argc == 3) {
5201                 int fd;
5202                 ssize_t nwritten;
5203
5204                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5205                 if (fd == -1) {
5206                         fprintf(stderr, "Failed to open output file %s\n",
5207                                 argv[2]);
5208                         goto fail;
5209                 }
5210
5211                 nwritten = sys_write(fd, data.dptr, data.dsize);
5212                 if (nwritten != data.dsize) {
5213                         fprintf(stderr, "Failed to write record to file\n");
5214                         close(fd);
5215                         goto fail;
5216                 }
5217
5218                 close(fd);
5219         }
5220
5221 fail:
5222         ret = ctdb_ltdb_header_extract(&data, &header);
5223         if (ret != 0) {
5224                 fprintf(stderr, "Failed to parse header from data\n");
5225                 return 1;
5226         }
5227
5228         dump_ltdb_header(&header);
5229         dump_tdb_data("data", data);
5230
5231         return 0;
5232 }
5233
5234 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5235                           int argc, const char **argv)
5236 {
5237         struct tdb_context *tdb;
5238         TDB_DATA key, data[2], value;
5239         struct ctdb_ltdb_header header;
5240         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5241         size_t np;
5242         int ret;
5243
5244         if (argc < 3 || argc > 5) {
5245                 usage("tstore");
5246         }
5247
5248         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5249         if (tdb == NULL) {
5250                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5251                 return 1;
5252         }
5253
5254         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5255         if (ret != 0) {
5256                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5257                 tdb_close(tdb);
5258                 return ret;
5259         }
5260
5261         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5262         if (ret != 0) {
5263                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5264                 tdb_close(tdb);
5265                 return ret;
5266         }
5267
5268         ZERO_STRUCT(header);
5269
5270         if (argc > 3) {
5271                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5272         }
5273         if (argc > 4) {
5274                 header.dmaster = (uint32_t)atol(argv[4]);
5275         }
5276         if (argc > 5) {
5277                 header.flags = (uint32_t)atol(argv[5]);
5278         }
5279
5280         ctdb_ltdb_header_push(&header, header_buf, &np);
5281
5282         data[0].dsize = np;
5283         data[0].dptr = header_buf;
5284
5285         data[1].dsize = value.dsize;
5286         data[1].dptr = value.dptr;
5287
5288         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5289         if (ret != 0) {
5290                 fprintf(stderr, "Failed to write record %s to file %s\n",
5291                         argv[1], argv[0]);
5292         }
5293
5294         tdb_close(tdb);
5295
5296         return ret;
5297 }
5298
5299 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5300                            int argc, const char **argv)
5301 {
5302         const char *db_name;
5303         struct ctdb_db_context *db;
5304         struct ctdb_record_handle *h;
5305         uint8_t db_flags;
5306         TDB_DATA key, data;
5307         bool readonly = false;
5308         int ret;
5309
5310         if (argc < 2 || argc > 3) {
5311                 usage("readkey");
5312         }
5313
5314         if (argc == 3) {
5315                 if (strcmp(argv[2], "readonly") == 0) {
5316                         readonly = true;
5317                 } else {
5318                         usage("readkey");
5319                 }
5320         }
5321
5322         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5323                 return 1;
5324         }
5325
5326         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5327                 fprintf(stderr, "DB %s is not a volatile database\n",
5328                         db_name);
5329                 return 1;
5330         }
5331
5332         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5333                           db_flags, &db);
5334         if (ret != 0) {
5335                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5336                 return ret;
5337         }
5338
5339         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5340         if (ret != 0) {
5341                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5342                 return ret;
5343         }
5344
5345         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5346                               db, key, readonly, &h, NULL, &data);
5347         if (ret != 0) {
5348                 fprintf(stderr, "Failed to read record for key %s\n",
5349                         argv[1]);
5350         } else {
5351                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5352                        (int)data.dsize, data.dptr);
5353         }
5354
5355         talloc_free(h);
5356         return ret;
5357 }
5358
5359 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5360                             int argc, const char **argv)
5361 {
5362         const char *db_name;
5363         struct ctdb_db_context *db;
5364         struct ctdb_record_handle *h;
5365         uint8_t db_flags;
5366         TDB_DATA key, data;
5367         int ret;
5368
5369         if (argc != 3) {
5370                 usage("writekey");
5371         }
5372
5373         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5374                 return 1;
5375         }
5376
5377         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5378                 fprintf(stderr, "DB %s is not a volatile database\n",
5379                         db_name);
5380                 return 1;
5381         }
5382
5383         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5384                           db_flags, &db);
5385         if (ret != 0) {
5386                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5387                 return ret;
5388         }
5389
5390         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5391         if (ret != 0) {
5392                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5393                 return ret;
5394         }
5395
5396         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5397         if (ret != 0) {
5398                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5399                 return ret;
5400         }
5401
5402         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5403                               db, key, false, &h, NULL, NULL);
5404         if (ret != 0) {
5405                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5406                 return ret;
5407         }
5408
5409         ret = ctdb_store_record(h, data);
5410         if (ret != 0) {
5411                 fprintf(stderr, "Failed to store record for key %s\n",
5412                         argv[1]);
5413         }
5414
5415         talloc_free(h);
5416         return ret;
5417 }
5418
5419 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5420                              int argc, const char **argv)
5421 {
5422         const char *db_name;
5423         struct ctdb_db_context *db;
5424         struct ctdb_record_handle *h;
5425         uint8_t db_flags;
5426         TDB_DATA key, data;
5427         int ret;
5428
5429         if (argc != 2) {
5430                 usage("deletekey");
5431         }
5432
5433         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5434                 return 1;
5435         }
5436
5437         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5438                 fprintf(stderr, "DB %s is not a volatile database\n",
5439                         db_name);
5440                 return 1;
5441         }
5442
5443         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5444                           db_flags, &db);
5445         if (ret != 0) {
5446                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5447                 return ret;
5448         }
5449
5450         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5451         if (ret != 0) {
5452                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5453                 return ret;
5454         }
5455
5456         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5457                               db, key, false, &h, NULL, &data);
5458         if (ret != 0) {
5459                 fprintf(stderr, "Failed to fetch record for key %s\n",
5460                         argv[1]);
5461                 return ret;
5462         }
5463
5464         ret = ctdb_delete_record(h);
5465         if (ret != 0) {
5466                 fprintf(stderr, "Failed to delete record for key %s\n",
5467                         argv[1]);
5468         }
5469
5470         talloc_free(h);
5471         return ret;
5472 }
5473
5474 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5475                                 int argc, const char **argv)
5476 {
5477         struct sockaddr_in sin;
5478         unsigned int port;
5479         int s, v;
5480         int ret;
5481
5482         if (argc != 1) {
5483                 usage("chktcpport");
5484         }
5485
5486         port = atoi(argv[0]);
5487
5488         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5489         if (s == -1) {
5490                 fprintf(stderr, "Failed to open local socket\n");
5491                 return errno;
5492         }
5493
5494         v = fcntl(s, F_GETFL, 0);
5495         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5496                 fprintf(stderr, "Unable to set socket non-blocking\n");
5497                 close(s);
5498                 return errno;
5499         }
5500
5501         bzero(&sin, sizeof(sin));
5502         sin.sin_family = AF_INET;
5503         sin.sin_port = htons(port);
5504         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5505         close(s);
5506         if (ret == -1) {
5507                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5508                 return errno;
5509         }
5510
5511         return 0;
5512 }
5513
5514 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5515                                int argc, const char **argv)
5516 {
5517         uint32_t db_id;
5518         const char *db_name;
5519         uint64_t seqnum;
5520         int ret;
5521
5522         if (argc != 1) {
5523                 usage("getdbseqnum");
5524         }
5525
5526         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5527                 return 1;
5528         }
5529
5530         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5531                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5532                                       &seqnum);
5533         if (ret != 0) {
5534                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5535                         db_name);
5536                 return ret;
5537         }
5538
5539         printf("0x%"PRIx64"\n", seqnum);
5540         return 0;
5541 }
5542
5543 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5544                               int argc, const char **argv)
5545 {
5546         const char *nodestring = NULL;
5547         struct ctdb_node_map *nodemap;
5548         int ret, i;
5549         bool print_hdr = false;
5550
5551         if (argc > 1) {
5552                 usage("nodestatus");
5553         }
5554
5555         if (argc == 1) {
5556                 nodestring = argv[0];
5557                 if (strcmp(nodestring, "all") == 0) {
5558                         print_hdr = true;
5559                 }
5560         }
5561
5562         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5563                 return 1;
5564         }
5565
5566         if (options.machinereadable) {
5567                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5568         } else {
5569                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, print_hdr);
5570         }
5571
5572         ret = 0;
5573         for (i=0; i<nodemap->num; i++) {
5574                 ret |= nodemap->node[i].flags;
5575         }
5576
5577         return ret;
5578 }
5579
5580 const struct {
5581         const char *name;
5582         uint32_t offset;
5583 } db_stats_fields[] = {
5584 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5585         DBSTATISTICS_FIELD(db_ro_delegations),
5586         DBSTATISTICS_FIELD(db_ro_revokes),
5587         DBSTATISTICS_FIELD(locks.num_calls),
5588         DBSTATISTICS_FIELD(locks.num_current),
5589         DBSTATISTICS_FIELD(locks.num_pending),
5590         DBSTATISTICS_FIELD(locks.num_failed),
5591 };
5592
5593 static void print_dbstatistics(const char *db_name,
5594                                struct ctdb_db_statistics *s)
5595 {
5596         int i;
5597         const char *prefix = NULL;
5598         int preflen = 0;
5599
5600         printf("DB Statistics %s\n", db_name);
5601
5602         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5603                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5604                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5605                         if (! prefix ||
5606                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5607                                 prefix = db_stats_fields[i].name;
5608                                 printf(" %*.*s\n", preflen-1, preflen-1,
5609                                        db_stats_fields[i].name);
5610                         }
5611                 } else {
5612                         preflen = 0;
5613                 }
5614                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5615                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5616                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5617         }
5618
5619         printf(" hop_count_buckets:");
5620         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5621                 printf(" %d", s->hop_count_bucket[i]);
5622         }
5623         printf("\n");
5624
5625         printf(" lock_buckets:");
5626         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5627                 printf(" %d", s->locks.buckets[i]);
5628         }
5629         printf("\n");
5630
5631         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5632                "locks_latency      MIN/AVG/MAX",
5633                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5634                s->locks.latency.max, s->locks.latency.num);
5635
5636         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5637                "vacuum_latency     MIN/AVG/MAX",
5638                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5639                s->vacuum.latency.max, s->vacuum.latency.num);
5640
5641         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5642         for (i=0; i<s->num_hot_keys; i++) {
5643                 int j;
5644                 printf("     Count:%d Key:", s->hot_keys[i].count);
5645                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5646                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5647                 }
5648                 printf("\n");
5649         }
5650 }
5651
5652 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5653                                 int argc, const char **argv)
5654 {
5655         uint32_t db_id;
5656         const char *db_name;
5657         struct ctdb_db_statistics *dbstats;
5658         int ret;
5659
5660         if (argc != 1) {
5661                 usage("dbstatistics");
5662         }
5663
5664         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5665                 return 1;
5666         }
5667
5668         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5669                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5670                                           &dbstats);
5671         if (ret != 0) {
5672                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5673                         db_name);
5674                 return ret;
5675         }
5676
5677         print_dbstatistics(db_name, dbstats);
5678         return 0;
5679 }
5680
5681 struct disable_takeover_runs_state {
5682         uint32_t *pnn_list;
5683         int node_count;
5684         bool *reply;
5685         int status;
5686         bool done;
5687 };
5688
5689 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5690                                          void *private_data)
5691 {
5692         struct disable_takeover_runs_state *state =
5693                 (struct disable_takeover_runs_state *)private_data;
5694         int ret, i;
5695
5696         if (data.dsize != sizeof(int)) {
5697                 /* Ignore packet */
5698                 return;
5699         }
5700
5701         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5702         ret = *(int *)data.dptr;
5703         if (ret < 0) {
5704                 state->status = ret;
5705                 state->done = true;
5706                 return;
5707         }
5708         for (i=0; i<state->node_count; i++) {
5709                 if (state->pnn_list[i] == ret) {
5710                         state->reply[i] = true;
5711                         break;
5712                 }
5713         }
5714
5715         state->done = true;
5716         for (i=0; i<state->node_count; i++) {
5717                 if (! state->reply[i]) {
5718                         state->done = false;
5719                         break;
5720                 }
5721         }
5722 }
5723
5724 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5725                                  struct ctdb_context *ctdb, uint32_t timeout,
5726                                  uint32_t *pnn_list, int count)
5727 {
5728         struct ctdb_disable_message disable = { 0 };
5729         struct disable_takeover_runs_state state;
5730         int ret, i;
5731
5732         disable.pnn = ctdb->pnn;
5733         disable.srvid = next_srvid(ctdb);
5734         disable.timeout = timeout;
5735
5736         state.pnn_list = pnn_list;
5737         state.node_count = count;
5738         state.done = false;
5739         state.status = 0;
5740         state.reply = talloc_zero_array(mem_ctx, bool, count);
5741         if (state.reply == NULL) {
5742                 return ENOMEM;
5743         }
5744
5745         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5746                                               disable.srvid,
5747                                               disable_takeover_run_handler,
5748                                               &state);
5749         if (ret != 0) {
5750                 return ret;
5751         }
5752
5753         for (i=0; i<count; i++) {
5754                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5755                                                          ctdb->client,
5756                                                          pnn_list[i],
5757                                                          &disable);
5758                 if (ret != 0) {
5759                         goto fail;
5760                 }
5761         }
5762
5763         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5764         if (ret == ETIME) {
5765                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5766         } else {
5767                 ret = (state.status >= 0 ? 0 : 1);
5768         }
5769
5770 fail:
5771         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5772                                            disable.srvid, &state);
5773         return ret;
5774 }
5775
5776 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5777                              int argc, const char **argv)
5778 {
5779         const char *nodestring = NULL;
5780         struct ctdb_node_map *nodemap, *nodemap2;
5781         struct ctdb_req_control request;
5782         uint32_t *pnn_list, *pnn_list2;
5783         int ret, count, count2;
5784
5785         if (argc > 1) {
5786                 usage("reloadips");
5787         }
5788
5789         if (argc == 1) {
5790                 nodestring = argv[0];
5791         }
5792
5793         nodemap = get_nodemap(ctdb, false);
5794         if (nodemap == NULL) {
5795                 return 1;
5796         }
5797
5798         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
5799                 return 1;
5800         }
5801
5802         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
5803                                         mem_ctx, &pnn_list);
5804         if (count <= 0) {
5805                 fprintf(stderr, "Memory allocation error\n");
5806                 return 1;
5807         }
5808
5809         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
5810                                       mem_ctx, &pnn_list2);
5811         if (count2 <= 0) {
5812                 fprintf(stderr, "Memory allocation error\n");
5813                 return 1;
5814         }
5815
5816         /* Disable takeover runs on all connected nodes.  A reply
5817          * indicating success is needed from each node so all nodes
5818          * will need to be active.
5819          *
5820          * A check could be added to not allow reloading of IPs when
5821          * there are disconnected nodes.  However, this should
5822          * probably be left up to the administrator.
5823          */
5824         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
5825                                     pnn_list, count);
5826         if (ret != 0) {
5827                 fprintf(stderr, "Failed to disable takeover runs\n");
5828                 return ret;
5829         }
5830
5831         /* Now tell all the desired nodes to reload their public IPs.
5832          * Keep trying this until it succeeds.  This assumes all
5833          * failures are transient, which might not be true...
5834          */
5835         ctdb_req_control_reload_public_ips(&request);
5836         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
5837                                         pnn_list2, count2, TIMEOUT(),
5838                                         &request, NULL, NULL);
5839         if (ret != 0) {
5840                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
5841         }
5842
5843         /* It isn't strictly necessary to wait until takeover runs are
5844          * re-enabled but doing so can't hurt.
5845          */
5846         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
5847         if (ret != 0) {
5848                 fprintf(stderr, "Failed to enable takeover runs\n");
5849                 return ret;
5850         }
5851
5852         return ipreallocate(mem_ctx, ctdb);
5853 }
5854
5855 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5856                            int argc, const char **argv)
5857 {
5858         ctdb_sock_addr addr;
5859         char *iface;
5860         int ret;
5861
5862         if (argc != 1) {
5863                 usage("ipiface");
5864         }
5865
5866         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
5867         if (ret != 0) {
5868                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
5869                 return 1;
5870         }
5871
5872         iface = ctdb_sys_find_ifname(&addr);
5873         if (iface == NULL) {
5874                 fprintf(stderr, "Failed to find interface for IP %s\n",
5875                         argv[0]);
5876                 return 1;
5877         }
5878         free(iface);
5879
5880         return 0;
5881 }
5882
5883
5884 static const struct ctdb_cmd {
5885         const char *name;
5886         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
5887         bool without_daemon; /* can be run without daemon running ? */
5888         bool remote; /* can be run on remote nodes */
5889         const char *msg;
5890         const char *args;
5891 } ctdb_commands[] = {
5892         { "version", control_version, true, false,
5893                 "show version of ctdb", NULL },
5894         { "status", control_status, false, true,
5895                 "show node status", NULL },
5896         { "uptime", control_uptime, false, true,
5897                 "show node uptime", NULL },
5898         { "ping", control_ping, false, true,
5899                 "ping all nodes", NULL },
5900         { "runstate", control_runstate, false, true,
5901                 "get/check runstate of a node",
5902                 "[setup|first_recovery|startup|running]" },
5903         { "getvar", control_getvar, false, true,
5904                 "get a tunable variable", "<name>" },
5905         { "setvar", control_setvar, false, true,
5906                 "set a tunable variable", "<name> <value>" },
5907         { "listvars", control_listvars, false, true,
5908                 "list tunable variables", NULL },
5909         { "statistics", control_statistics, false, true,
5910                 "show ctdb statistics", NULL },
5911         { "statisticsreset", control_statistics_reset, false, true,
5912                 "reset ctdb statistics", NULL },
5913         { "stats", control_stats, false, true,
5914                 "show rolling statistics", "[count]" },
5915         { "ip", control_ip, false, true,
5916                 "show public ips", "[all]" },
5917         { "ipinfo", control_ipinfo, false, true,
5918                 "show public ip details", "<ip>" },
5919         { "ifaces", control_ifaces, false, true,
5920                 "show interfaces", NULL },
5921         { "setifacelink", control_setifacelink, false, true,
5922                 "set interface link status", "<iface> up|down" },
5923         { "process-exists", control_process_exists, false, true,
5924                 "check if a process exists on a node",  "<pid> [<srvid>]" },
5925         { "getdbmap", control_getdbmap, false, true,
5926                 "show attached databases", NULL },
5927         { "getdbstatus", control_getdbstatus, false, true,
5928                 "show database status", "<dbname|dbid>" },
5929         { "catdb", control_catdb, false, false,
5930                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
5931         { "cattdb", control_cattdb, false, false,
5932                 "dump local ctdb database", "<dbname|dbid>" },
5933         { "getcapabilities", control_getcapabilities, false, true,
5934                 "show node capabilities", NULL },
5935         { "pnn", control_pnn, false, false,
5936                 "show the pnn of the currnet node", NULL },
5937         { "lvs", control_lvs, false, false,
5938                 "show lvs configuration", "master|list|status" },
5939         { "setdebug", control_setdebug, false, true,
5940                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
5941         { "getdebug", control_getdebug, false, true,
5942                 "get debug level", NULL },
5943         { "attach", control_attach, false, false,
5944                 "attach a database", "<dbname> [persistent|replicated]" },
5945         { "detach", control_detach, false, false,
5946                 "detach database(s)", "<dbname|dbid> ..." },
5947         { "dumpmemory", control_dumpmemory, false, true,
5948                 "dump ctdbd memory map", NULL },
5949         { "rddumpmemory", control_rddumpmemory, false, true,
5950                 "dump recoverd memory map", NULL },
5951         { "getpid", control_getpid, false, true,
5952                 "get ctdbd process ID", NULL },
5953         { "disable", control_disable, false, true,
5954                 "disable a node", NULL },
5955         { "enable", control_enable, false, true,
5956                 "enable a node", NULL },
5957         { "stop", control_stop, false, true,
5958                 "stop a node", NULL },
5959         { "continue", control_continue, false, true,
5960                 "continue a stopped node", NULL },
5961         { "ban", control_ban, false, true,
5962                 "ban a node", "<bantime>"},
5963         { "unban", control_unban, false, true,
5964                 "unban a node", NULL },
5965         { "shutdown", control_shutdown, false, true,
5966                 "shutdown ctdb daemon", NULL },
5967         { "recover", control_recover, false, true,
5968                 "force recovery", NULL },
5969         { "sync", control_ipreallocate, false, true,
5970                 "run ip reallocation (deprecated)", NULL },
5971         { "ipreallocate", control_ipreallocate, false, true,
5972                 "run ip reallocation", NULL },
5973         { "isnotrecmaster", control_isnotrecmaster, false, false,
5974                 "check if local node is the recmaster", NULL },
5975         { "gratarp", control_gratarp, false, true,
5976                 "send a gratuitous arp", "<ip> <interface>" },
5977         { "tickle", control_tickle, true, false,
5978                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5979         { "gettickles", control_gettickles, false, true,
5980                 "get the list of tickles", "<ip> [<port>]" },
5981         { "addtickle", control_addtickle, false, true,
5982                 "add a tickle", "<ip>:<port> <ip>:<port>" },
5983         { "deltickle", control_deltickle, false, true,
5984                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
5985         { "listnodes", control_listnodes, true, true,
5986                 "list nodes in the cluster", NULL },
5987         { "reloadnodes", control_reloadnodes, false, false,
5988                 "reload the nodes file all nodes", NULL },
5989         { "moveip", control_moveip, false, false,
5990                 "move an ip address to another node", "<ip> <node>" },
5991         { "addip", control_addip, false, true,
5992                 "add an ip address to a node", "<ip/mask> <iface>" },
5993         { "delip", control_delip, false, true,
5994                 "delete an ip address from a node", "<ip>" },
5995         { "backupdb", control_backupdb, false, false,
5996                 "backup a database into a file", "<dbname|dbid> <file>" },
5997         { "restoredb", control_restoredb, false, false,
5998                 "restore a database from a file", "<file> [dbname]" },
5999         { "dumpdbbackup", control_dumpdbbackup, true, false,
6000                 "dump database from a backup file", "<file>" },
6001         { "wipedb", control_wipedb, false, false,
6002                 "wipe the contents of a database.", "<dbname|dbid>"},
6003         { "recmaster", control_recmaster, false, true,
6004                 "show the pnn for the recovery master", NULL },
6005         { "event", control_event, true, false,
6006                 "event and event script commands", NULL },
6007         { "scriptstatus", control_scriptstatus, true, false,
6008                 "show event script status",
6009                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6010         { "natgw", control_natgw, false, false,
6011                 "show natgw configuration", "master|list|status" },
6012         { "getreclock", control_getreclock, false, true,
6013                 "get recovery lock file", NULL },
6014         { "setlmasterrole", control_setlmasterrole, false, true,
6015                 "set LMASTER role", "on|off" },
6016         { "setrecmasterrole", control_setrecmasterrole, false, true,
6017                 "set RECMASTER role", "on|off"},
6018         { "setdbreadonly", control_setdbreadonly, false, true,
6019                 "enable readonly records", "<dbname|dbid>" },
6020         { "setdbsticky", control_setdbsticky, false, true,
6021                 "enable sticky records", "<dbname|dbid>"},
6022         { "pfetch", control_pfetch, false, false,
6023                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6024         { "pstore", control_pstore, false, false,
6025                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6026         { "pdelete", control_pdelete, false, false,
6027                 "delete record from persistent database", "<dbname|dbid> <key>" },
6028         { "ptrans", control_ptrans, false, false,
6029                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6030         { "tfetch", control_tfetch, false, true,
6031                 "fetch a record", "<tdb-file> <key> [<file>]" },
6032         { "tstore", control_tstore, false, true,
6033                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6034         { "readkey", control_readkey, false, false,
6035                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6036         { "writekey", control_writekey, false, false,
6037                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6038         { "deletekey", control_deletekey, false, false,
6039                 "delete a database key", "<dbname|dbid> <key>" },
6040         { "checktcpport", control_checktcpport, true, false,
6041                 "check if a service is bound to a specific tcp port or not", "<port>" },
6042         { "getdbseqnum", control_getdbseqnum, false, false,
6043                 "get database sequence number", "<dbname|dbid>" },
6044         { "nodestatus", control_nodestatus, false, true,
6045                 "show and return node status", "[all|<pnn-list>]" },
6046         { "dbstatistics", control_dbstatistics, false, true,
6047                 "show database statistics", "<dbname|dbid>" },
6048         { "reloadips", control_reloadips, false, false,
6049                 "reload the public addresses file", "[all|<pnn-list>]" },
6050         { "ipiface", control_ipiface, true, false,
6051                 "Find the interface an ip address is hosted on", "<ip>" },
6052 };
6053
6054 static const struct ctdb_cmd *match_command(const char *command)
6055 {
6056         const struct ctdb_cmd *cmd;
6057         int i;
6058
6059         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6060                 cmd = &ctdb_commands[i];
6061                 if (strlen(command) == strlen(cmd->name) &&
6062                     strncmp(command, cmd->name, strlen(command)) == 0) {
6063                         return cmd;
6064                 }
6065         }
6066
6067         return NULL;
6068 }
6069
6070
6071 /**
6072  * Show usage message
6073  */
6074 static void usage_full(void)
6075 {
6076         int i;
6077
6078         poptPrintHelp(pc, stdout, 0);
6079         printf("\nCommands:\n");
6080         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6081                 printf("  %-15s %-27s  %s\n",
6082                        ctdb_commands[i].name,
6083                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6084                        ctdb_commands[i].msg);
6085         }
6086 }
6087
6088 static void usage(const char *command)
6089 {
6090         const struct ctdb_cmd *cmd;
6091
6092         if (command == NULL) {
6093                 usage_full();
6094                 exit(1);
6095         }
6096
6097         cmd = match_command(command);
6098         if (cmd == NULL) {
6099                 usage_full();
6100         } else {
6101                 poptPrintUsage(pc, stdout, 0);
6102                 printf("\nCommands:\n");
6103                 printf("  %-15s %-27s  %s\n",
6104                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6105         }
6106
6107         exit(1);
6108 }
6109
6110 struct poptOption cmdline_options[] = {
6111         POPT_AUTOHELP
6112         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6113                 "CTDB socket path", "filename" },
6114         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6115                 "debug level"},
6116         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6117                 "timelimit (in seconds)" },
6118         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6119                 "node specification - integer" },
6120         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6121                 "enable machine readable output", NULL },
6122         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6123                 "specify separator for machine readable output", "CHAR" },
6124         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6125                 "enable machine parsable output with separator |", NULL },
6126         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6127                 "enable verbose output", NULL },
6128         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6129                 "die if runtime exceeds this limit (in seconds)" },
6130         POPT_TABLEEND
6131 };
6132
6133 static int process_command(const struct ctdb_cmd *cmd, int argc,
6134                            const char **argv)
6135 {
6136         TALLOC_CTX *tmp_ctx;
6137         struct ctdb_context *ctdb;
6138         int ret;
6139         bool status;
6140         uint64_t srvid_offset;
6141
6142         tmp_ctx = talloc_new(NULL);
6143         if (tmp_ctx == NULL) {
6144                 fprintf(stderr, "Memory allocation error\n");
6145                 goto fail;
6146         }
6147
6148         if (cmd->without_daemon) {
6149                 if (options.pnn != -1) {
6150                         fprintf(stderr,
6151                                 "Cannot specify node for command %s\n",
6152                                 cmd->name);
6153                         goto fail;
6154                 }
6155
6156                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6157                 talloc_free(tmp_ctx);
6158                 return ret;
6159         }
6160
6161         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6162         if (ctdb == NULL) {
6163                 fprintf(stderr, "Memory allocation error\n");
6164                 goto fail;
6165         }
6166
6167         ctdb->ev = tevent_context_init(ctdb);
6168         if (ctdb->ev == NULL) {
6169                 fprintf(stderr, "Failed to initialize tevent\n");
6170                 goto fail;
6171         }
6172
6173         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6174         if (ret != 0) {
6175                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6176                         options.socket);
6177
6178                 if (!find_node_xpnn(ctdb, NULL)) {
6179                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6180                 }
6181                 goto fail;
6182         }
6183
6184         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6185         srvid_offset = getpid() & 0xFFFF;
6186         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6187
6188         if (options.pnn != -1) {
6189                 status = verify_pnn(ctdb, options.pnn);
6190                 if (! status) {
6191                         goto fail;
6192                 }
6193
6194                 ctdb->cmd_pnn = options.pnn;
6195         } else {
6196                 ctdb->cmd_pnn = ctdb->pnn;
6197         }
6198
6199         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6200                 fprintf(stderr, "Node cannot be specified for command %s\n",
6201                         cmd->name);
6202                 goto fail;
6203         }
6204
6205         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6206         talloc_free(tmp_ctx);
6207         return ret;
6208
6209 fail:
6210         talloc_free(tmp_ctx);
6211         return 1;
6212 }
6213
6214 static void signal_handler(int sig)
6215 {
6216         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6217 }
6218
6219 static void alarm_handler(int sig)
6220 {
6221         /* Kill any child processes */
6222         signal(SIGTERM, signal_handler);
6223         kill(0, SIGTERM);
6224
6225         _exit(1);
6226 }
6227
6228 int main(int argc, const char *argv[])
6229 {
6230         int opt;
6231         const char **extra_argv;
6232         int extra_argc;
6233         const struct ctdb_cmd *cmd;
6234         const char *ctdb_socket;
6235         int loglevel;
6236         int ret;
6237
6238         setlinebuf(stdout);
6239
6240         /* Set default options */
6241         options.socket = CTDB_SOCKET;
6242         options.debuglevelstr = NULL;
6243         options.timelimit = 10;
6244         options.sep = "|";
6245         options.maxruntime = 0;
6246         options.pnn = -1;
6247
6248         ctdb_socket = getenv("CTDB_SOCKET");
6249         if (ctdb_socket != NULL) {
6250                 options.socket = ctdb_socket;
6251         }
6252
6253         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6254                             POPT_CONTEXT_KEEP_FIRST);
6255         while ((opt = poptGetNextOpt(pc)) != -1) {
6256                 fprintf(stderr, "Invalid option %s: %s\n",
6257                         poptBadOption(pc, 0), poptStrerror(opt));
6258                 exit(1);
6259         }
6260
6261         if (options.maxruntime == 0) {
6262                 const char *ctdb_timeout;
6263
6264                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6265                 if (ctdb_timeout != NULL) {
6266                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6267                 } else {
6268                         options.maxruntime = 120;
6269                 }
6270         }
6271         if (options.maxruntime <= 120) {
6272                 /* default timeout is 120 seconds */
6273                 options.maxruntime = 120;
6274         }
6275
6276         if (options.machineparsable) {
6277                 options.machinereadable = 1;
6278         }
6279
6280         /* setup the remaining options for the commands */
6281         extra_argc = 0;
6282         extra_argv = poptGetArgs(pc);
6283         if (extra_argv) {
6284                 extra_argv++;
6285                 while (extra_argv[extra_argc]) extra_argc++;
6286         }
6287
6288         if (extra_argc < 1) {
6289                 usage(NULL);
6290         }
6291
6292         cmd = match_command(extra_argv[0]);
6293         if (cmd == NULL) {
6294                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6295                 exit(1);
6296         }
6297
6298         /* Enable logging */
6299         setup_logging("ctdb", DEBUG_STDERR);
6300         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6301                 DEBUGLEVEL = loglevel;
6302         } else {
6303                 DEBUGLEVEL = DEBUG_ERR;
6304         }
6305
6306         signal(SIGALRM, alarm_handler);
6307         alarm(options.maxruntime);
6308
6309         ret = process_command(cmd, extra_argc, extra_argv);
6310         if (ret == -1) {
6311                 ret = 1;
6312         }
6313
6314         (void)poptFreeContext(pc);
6315
6316         return ret;
6317 }