tools/ctdb: Display the locking statistics
[obnox/ctdb.git] / tools / ctdb.c
index e9c8b91492c765ea2aafef97bcd165f9f7194d0a..fcc44bb2b6fad549de19abf4907061bc02b5f2bf 100644 (file)
@@ -19,7 +19,6 @@
 */
 
 #include "includes.h"
-#include "lib/tevent/tevent.h"
 #include "system/time.h"
 #include "system/filesys.h"
 #include "system/network.h"
@@ -43,9 +42,15 @@ static void usage(void);
 static struct {
        int timelimit;
        uint32_t pnn;
+       uint32_t *nodes;
        int machinereadable;
        int verbose;
        int maxruntime;
+       int printemptyrecords;
+       int printdatasize;
+       int printlmaster;
+       int printhash;
+       int printrecordflags;
 } options;
 
 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
@@ -61,46 +66,179 @@ static int control_version(struct ctdb_context *ctdb, int argc, const char **arg
 }
 #endif
 
+#define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                           \
+               DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
+                                  "Out of memory in " __location__ )); \
+               abort();                                                \
+       }} while (0)
 
-/*
-  verify that a node exists and is reachable
+/* Pretty print the flags to a static buffer in human-readable format.
+ * This never returns NULL!
  */
-static void verify_node(struct ctdb_context *ctdb)
+static const char *pretty_print_flags(uint32_t flags)
 {
-       int ret;
-       struct ctdb_node_map *nodemap=NULL;
+       int j;
+       static const struct {
+               uint32_t flag;
+               const char *name;
+       } flag_names[] = {
+               { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
+               { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
+               { NODE_FLAGS_BANNED,                "BANNED" },
+               { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
+               { NODE_FLAGS_DELETED,               "DELETED" },
+               { NODE_FLAGS_STOPPED,               "STOPPED" },
+               { NODE_FLAGS_INACTIVE,              "INACTIVE" },
+       };
+       static char flags_str[512]; /* Big enough to contain all flag names */
 
-       if (options.pnn == CTDB_CURRENT_NODE) {
-               return;
+       flags_str[0] = '\0';
+       for (j=0;j<ARRAY_SIZE(flag_names);j++) {
+               if (flags & flag_names[j].flag) {
+                       if (flags_str[0] == '\0') {
+                               (void) strcpy(flags_str, flag_names[j].name);
+                       } else {
+                               (void) strcat(flags_str, "|");
+                               (void) strcat(flags_str, flag_names[j].name);
+                       }
+               }
        }
-       if (options.pnn == CTDB_BROADCAST_ALL) {
-               return;
+       if (flags_str[0] == '\0') {
+               (void) strcpy(flags_str, "OK");
        }
 
-       /* verify the node exists */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               exit(10);
-       }
-       if (options.pnn >= nodemap->num) {
-               DEBUG(DEBUG_ERR, ("Node %u does not exist\n", options.pnn));
-               exit(ERR_NONODE);
-       }
-       if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DELETED) {
-               DEBUG(DEBUG_ERR, ("Node %u is DELETED\n", options.pnn));
-               exit(ERR_DISNODE);
+       return flags_str;
+}
+
+static int h2i(char h)
+{
+       if (h >= 'a' && h <= 'f') return h - 'a' + 10;
+       if (h >= 'A' && h <= 'F') return h - 'f' + 10;
+       return h - '0';
+}
+
+static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
+{
+       int i, len;
+       TDB_DATA key = {NULL, 0};
+
+       len = strlen(str);
+       if (len & 0x01) {
+               DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
+               return key;
        }
-       if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_DISCONNECTED) {
-               DEBUG(DEBUG_ERR, ("Node %u is DISCONNECTED\n", options.pnn));
-               exit(ERR_DISNODE);
+
+       key.dsize = len>>1;
+       key.dptr  = talloc_size(mem_ctx, key.dsize);
+
+       for (i=0; i < len/2; i++) {
+               key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
        }
+       return key;
+}
 
-       /* verify we can access the node */
-       ret = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
-       if (ret == -1) {
-               DEBUG(DEBUG_ERR,("Can not access node. Node is not operational.\n"));
+/* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
+ * that are disconnected or deleted.  If dd_ok is true those nodes are
+ * included in the output list of nodes.  If dd_ok is false, those
+ * nodes are filtered from the "all" case and cause an error if
+ * explicitly specified.
+ */
+static bool parse_nodestring(struct ctdb_context *ctdb,
+                            const char * nodestring,
+                            uint32_t current_pnn,
+                            bool dd_ok,
+                            uint32_t **nodes,
+                            uint32_t *pnn_mode)
+{
+       int n;
+       uint32_t i;
+       struct ctdb_node_map *nodemap;
+       
+       *nodes = NULL;
+
+       if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
                exit(10);
        }
+
+       if (nodestring != NULL) {
+               *nodes = talloc_array(ctdb, uint32_t, 0);
+               CTDB_NOMEM_ABORT(*nodes);
+               
+               n = 0;
+
+               if (strcmp(nodestring, "all") == 0) {
+                       *pnn_mode = CTDB_BROADCAST_ALL;
+
+                       /* all */
+                       for (i = 0; i < nodemap->num; i++) {
+                               if ((nodemap->nodes[i].flags & 
+                                    (NODE_FLAGS_DISCONNECTED |
+                                     NODE_FLAGS_DELETED)) && !dd_ok) {
+                                       continue;
+                               }
+                               *nodes = talloc_realloc(ctdb, *nodes,
+                                                       uint32_t, n+1);
+                               CTDB_NOMEM_ABORT(*nodes);
+                               (*nodes)[n] = i;
+                               n++;
+                       }
+               } else {
+                       /* x{,y...} */
+                       char *ns, *tok;
+                       
+                       ns = talloc_strdup(ctdb, nodestring);
+                       tok = strtok(ns, ",");
+                       while (tok != NULL) {
+                               uint32_t pnn;
+                               i = (uint32_t)strtoul(tok, NULL, 0);
+                               if (i >= nodemap->num) {
+                                       DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
+                                       exit(ERR_NONODE);
+                               }
+                               if ((nodemap->nodes[i].flags & 
+                                    (NODE_FLAGS_DISCONNECTED |
+                                     NODE_FLAGS_DELETED)) && !dd_ok) {
+                                       DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
+                                       exit(ERR_DISNODE);
+                               }
+                               if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
+                                       DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
+                                       exit(10);
+                               }
+
+                               *nodes = talloc_realloc(ctdb, *nodes,
+                                                       uint32_t, n+1);
+                               CTDB_NOMEM_ABORT(*nodes);
+
+                               (*nodes)[n] = i;
+                               n++;
+
+                               tok = strtok(NULL, ",");
+                       }
+                       talloc_free(ns);
+
+                       if (n == 1) {
+                               *pnn_mode = (*nodes)[0];
+                       } else {
+                               *pnn_mode = CTDB_MULTICAST;
+                       }
+               }
+       } else {
+               /* default - no nodes specified */
+               *nodes = talloc_array(ctdb, uint32_t, 1);
+               CTDB_NOMEM_ABORT(*nodes);
+               *pnn_mode = CTDB_CURRENT_NODE;
+
+               if (!ctdb_getpnn(ctdb_connection, current_pnn,
+                                &((*nodes)[0]))) {
+                       return false;
+               }
+       }
+
+       ctdb_free_nodemap(nodemap);
+
+       return true;
 }
 
 /*
@@ -123,7 +261,7 @@ static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persi
                ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
                if (!strcmp(name, db_name)) {
                        if (persistent) {
-                               *persistent = dbmap->dbs[i].persistent;
+                               *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                        }
                        return 0;
                }
@@ -196,6 +334,10 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                STATISTICS_FIELD(timeouts.call),
                STATISTICS_FIELD(timeouts.control),
                STATISTICS_FIELD(timeouts.traverse),
+               STATISTICS_FIELD(locks.num_calls),
+               STATISTICS_FIELD(locks.num_current),
+               STATISTICS_FIELD(locks.num_pending),
+               STATISTICS_FIELD(locks.num_failed),
                STATISTICS_FIELD(total_calls),
                STATISTICS_FIELD(pending_calls),
                STATISTICS_FIELD(lockwait_calls),
@@ -204,7 +346,10 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                STATISTICS_FIELD(pending_childwrite_calls),
                STATISTICS_FIELD(memory_used),
                STATISTICS_FIELD(max_hop_count),
+               STATISTICS_FIELD(total_ro_delegations),
+               STATISTICS_FIELD(total_ro_revokes),
        };
+       
        tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
        seconds = tmp%60;
        tmp    /= 60;
@@ -300,7 +445,19 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                               preflen?0:4, "",
                               *(uint32_t *)(fields[i].offset+(uint8_t *)s));
                }
-               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
+               printf(" hop_count_buckets:");
+               for (i=0;i<MAX_COUNT_BUCKETS;i++) {
+                       printf(" %d", s->hop_count_bucket[i]);
+               }
+               printf("\n");
+               printf(" lock_buckets:");
+               for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+                       printf(" %d", s->locks.buckets[i]);
+               }
+               printf("\n");
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
+
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
 
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
 
@@ -424,6 +581,88 @@ static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
 }
 
 
+/*
+  display remote ctdb db statistics
+ */
+static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_db_statistics *dbstat;
+       struct ctdb_dbid_map *dbmap=NULL;
+       int i, ret;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
+               return ret;
+       }
+       for(i=0;i<dbmap->num;i++){
+               const char *name;
+
+               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+               if(!strcmp(argv[0], name)){
+                       talloc_free(discard_const(name));
+                       break;
+               }
+               talloc_free(discard_const(name));
+       }
+       if (i == dbmap->num) {
+               DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstat)) {
+               DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       printf("DB Statistics:\n");
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %s\n", "locks");
+       printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
+               dbstat->locks.num_calls);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
+               dbstat->locks.num_failed);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
+               dbstat->locks.num_current);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
+               dbstat->locks.num_pending);
+       printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
+               "    latency_ctdbd  MIN/AVG/MAX",
+               dbstat->locks.latency.min,
+               (dbstat->locks.latency.num ?
+                dbstat->locks.latency.total /dbstat->locks.latency.num :
+                0.0),
+               dbstat->locks.latency.max,
+               dbstat->locks.latency.num);
+       printf(" %s", "    buckets:");
+       for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+               printf(" %d", dbstat->hop_count_bucket[i]);
+       }
+       printf("\n");
+       printf("Num Hot Keys:     %d\n", dbstat->num_hot_keys);
+       for (i = 0; i < dbstat->num_hot_keys; i++) {
+               int j;
+               printf("Count:%d Key:", dbstat->hot_keys[i].count);
+               for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
+                       printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
+               }
+               printf("\n");
+       }
+
+       ctdb_free_dbstat(dbstat);
+       return 0;
+}
+
 /*
   display uptime of remote node
  */
@@ -602,130 +841,105 @@ static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
        return -1;
 }
 
+/* Helpers for ctdb status
+ */
+static bool is_partially_online(struct ctdb_node_and_flags *node)
+{
+       int j;
+       bool ret = false;
+
+       if (node->flags == 0) {
+               struct ctdb_ifaces_list *ifaces;
+
+               if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
+                       for (j=0; j < ifaces->num; j++) {
+                               if (ifaces->ifaces[j].link_state != 0) {
+                                       continue;
+                               }
+                               ret = true;
+                               break;
+                       }
+                       ctdb_free_ifaces(ifaces);
+               }
+       }
+
+       return ret;
+}
+
+static void control_status_header_machine(void)
+{
+       printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
+              ":Inactive:PartiallyOnline:ThisNode:\n");
+}
+
+static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
+{
+       printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
+              ctdb_addr_to_str(&node->addr),
+              !!(node->flags&NODE_FLAGS_DISCONNECTED),
+              !!(node->flags&NODE_FLAGS_BANNED),
+              !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
+              !!(node->flags&NODE_FLAGS_UNHEALTHY),
+              !!(node->flags&NODE_FLAGS_STOPPED),
+              !!(node->flags&NODE_FLAGS_INACTIVE),
+              is_partially_online(node) ? 1 : 0,
+              (node->pnn == mypnn)?'Y':'N');
+
+       return node->flags;
+}
+
+static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
+{
+       printf("pnn:%d %-16s %s%s\n", node->pnn,
+              ctdb_addr_to_str(&node->addr),
+              is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
+              node->pnn == mypnn?" (THIS NODE)":"");
+
+       return node->flags;
+}
+
 /*
   display remote ctdb status
  */
 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
+       int i;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_node_map *nodemap=NULL;
-       uint32_t recmode, recmaster;
-       int mypnn;
+       uint32_t recmode, recmaster, mypnn;
 
-       mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
-       if (mypnn == -1) {
+       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
                return -1;
        }
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
 
-       if(options.machinereadable){
-               printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped:Inactive:PartiallyOnline:\n");
-               for(i=0;i<nodemap->num;i++){
-                       int partially_online = 0;
-                       int j;
-
+       if (options.machinereadable) {
+               control_status_header_machine();
+               for (i=0;i<nodemap->num;i++) {
                        if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                                continue;
                        }
-                       if (nodemap->nodes[i].flags == 0) {
-                               struct ctdb_control_get_ifaces *ifaces;
-
-                               ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
-                                                          nodemap->nodes[i].pnn,
-                                                          ctdb, &ifaces);
-                               if (ret == 0) {
-                                       for (j=0; j < ifaces->num; j++) {
-                                               if (ifaces->ifaces[j].link_state != 0) {
-                                                       continue;
-                                               }
-                                               partially_online = 1;
-                                               break;
-                                       }
-                                       talloc_free(ifaces);
-                               }
-                       }
-                       printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:\n", nodemap->nodes[i].pnn,
-                               ctdb_addr_to_str(&nodemap->nodes[i].addr),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED),
-                              !!(nodemap->nodes[i].flags&NODE_FLAGS_INACTIVE),
-                              partially_online);
+                       (void) control_status_1_machine(mypnn,
+                                                       &nodemap->nodes[i]);
                }
                return 0;
        }
 
        printf("Number of nodes:%d\n", nodemap->num);
        for(i=0;i<nodemap->num;i++){
-               static const struct {
-                       uint32_t flag;
-                       const char *name;
-               } flag_names[] = {
-                       { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
-                       { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
-                       { NODE_FLAGS_BANNED,                "BANNED" },
-                       { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
-                       { NODE_FLAGS_DELETED,               "DELETED" },
-                       { NODE_FLAGS_STOPPED,               "STOPPED" },
-                       { NODE_FLAGS_INACTIVE,              "INACTIVE" },
-               };
-               char *flags_str = NULL;
-               int j;
-
                if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                        continue;
                }
-               if (nodemap->nodes[i].flags == 0) {
-                       struct ctdb_control_get_ifaces *ifaces;
-
-                       ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(),
-                                                  nodemap->nodes[i].pnn,
-                                                  ctdb, &ifaces);
-                       if (ret == 0) {
-                               for (j=0; j < ifaces->num; j++) {
-                                       if (ifaces->ifaces[j].link_state != 0) {
-                                               continue;
-                                       }
-                                       flags_str = talloc_strdup(ctdb, "PARTIALLYONLINE");
-                                       break;
-                               }
-                               talloc_free(ifaces);
-                       }
-               }
-               for (j=0;j<ARRAY_SIZE(flag_names);j++) {
-                       if (nodemap->nodes[i].flags & flag_names[j].flag) {
-                               if (flags_str == NULL) {
-                                       flags_str = talloc_strdup(ctdb, flag_names[j].name);
-                               } else {
-                                       flags_str = talloc_asprintf_append(flags_str, "|%s",
-                                                                          flag_names[j].name);
-                               }
-                               CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
-                       }
-               }
-               if (flags_str == NULL) {
-                       flags_str = talloc_strdup(ctdb, "OK");
-                       CTDB_NO_MEMORY_FATAL(ctdb, flags_str);
-               }
-               printf("pnn:%d %-16s %s%s\n", nodemap->nodes[i].pnn,
-                      ctdb_addr_to_str(&nodemap->nodes[i].addr),
-                      flags_str,
-                      nodemap->nodes[i].pnn == mypnn?" (THIS NODE)":"");
-               talloc_free(flags_str);
+               (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
        }
 
-       ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &vnnmap);
-       if (ret != 0) {
+       if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
                DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
        if (vnnmap->generation == INVALID_GENERATION) {
                printf("Generation:INVALID\n");
@@ -736,6 +950,7 @@ static int control_status(struct ctdb_context *ctdb, int argc, const char **argv
        for(i=0;i<vnnmap->size;i++){
                printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
        }
+       ctdb_free_vnnmap(vnnmap);
 
        if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
                DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
@@ -752,29 +967,113 @@ static int control_status(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
+static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int i, ret;
+       struct ctdb_node_map *nodemap=NULL;
+       uint32_t * nodes;
+       uint32_t pnn_mode, mypnn;
+
+       if (argc > 1) {
+               usage();
+       }
+
+       if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
+                             options.pnn, true, &nodes, &pnn_mode)) {
+               return -1;
+       }
+
+       if (options.machinereadable) {
+               control_status_header_machine();
+       } else if (pnn_mode == CTDB_BROADCAST_ALL) {
+               printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
+       }
+
+       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
+               DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
+               return -1;
+       }
+
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               return -1;
+       }
+
+       ret = 0;
+
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (options.machinereadable) {
+                       ret |= control_status_1_machine(mypnn,
+                                                       &nodemap->nodes[nodes[i]]);
+               } else {
+                       ret |= control_status_1_human(mypnn,
+                                                     &nodemap->nodes[nodes[i]]);
+               }
+       }
+       return ret;
+}
 
 struct natgw_node {
        struct natgw_node *next;
        const char *addr;
 };
 
+static int find_natgw(struct ctdb_context *ctdb,
+                      struct ctdb_node_map *nodemap, uint32_t flags,
+                      uint32_t *pnn, const char **ip)
+{
+       int i;
+       uint32_t capabilities;
+
+       for (i=0;i<nodemap->num;i++) {
+               if (!(nodemap->nodes[i].flags & flags)) {
+                       if (!ctdb_getcapabilities(ctdb_connection, nodemap->nodes[i].pnn, &capabilities)) {
+                               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
+                               return -1;
+                       }
+                       if (!(capabilities&CTDB_CAP_NATGW)) {
+                               continue;
+                       }
+                       *pnn = nodemap->nodes[i].pnn;
+                       *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
+                       return 0;
+               }
+       }
+
+       return 2; /* matches ENOENT */
+}
+
 /*
   display the list of nodes belonging to this natgw configuration
  */
 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int i, ret;
-       uint32_t capabilities;
        const char *natgw_list;
        int nlines;
        char **lines;
        struct natgw_node *natgw_nodes = NULL;
        struct natgw_node *natgw_node;
        struct ctdb_node_map *nodemap=NULL;
+       uint32_t mypnn, pnn;
+       const char *ip;
 
+       /* When we have some nodes that could be the NATGW, make a
+        * series of attempts to find the first node that doesn't have
+        * certain status flags set.
+        */
+       uint32_t exclude_flags[] = {
+               /* Look for a nice healthy node */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
+               /* If not found, an UNHEALTHY/BANNED node will do */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
+               /* If not found, a STOPPED node will do */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
+               0,
+       };
 
        /* read the natgw nodes file into a linked list */
-       natgw_list = getenv("NATGW_NODES");
+       natgw_list = getenv("CTDB_NATGW_NODES");
        if (natgw_list == NULL) {
                natgw_list = "/etc/ctdb/natgw_nodes";
        }
@@ -783,9 +1082,6 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
                return -1;
        }
-       while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
-               nlines--;
-       }
        for (i=0;i<nlines;i++) {
                char *node;
 
@@ -807,12 +1103,14 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                natgw_nodes = natgw_node;
        }
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
-               return ret;
+               return -1;
        }
 
+       /* Trim the nodemap so it only includes connected nodes in the
+        * current natgw group.
+        */
        i=0;
        while(i<nodemap->num) {
                for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
@@ -836,79 +1134,56 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                }
 
                i++;
-       }               
+       }
 
-       /* pick a node to be natgwmaster
-        * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
-        */
-       for(i=0;i<nodemap->num;i++){
-               if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
-                       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                       if (ret != 0) {
-                               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                               return ret;
-                       }
-                       if (!(capabilities&CTDB_CAP_NATGW)) {
-                               continue;
-                       }
-                       printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                       break;
+       ret = 2; /* matches ENOENT */
+       pnn = -1;
+       ip = "0.0.0.0";
+       for (i = 0; exclude_flags[i] != 0; i++) {
+               ret = find_natgw(ctdb, nodemap,
+                                exclude_flags[i],
+                                &pnn, &ip);
+               if (ret == -1) {
+                       goto done;
                }
-       }
-       /* we couldnt find any healthy node, try unhealthy ones */
-       if (i == nodemap->num) {
-               for(i=0;i<nodemap->num;i++){
-                       if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
-                               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                                       return ret;
-                               }
-                               if (!(capabilities&CTDB_CAP_NATGW)) {
-                                       continue;
-                               }
-                               printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                               break;
-                       }
+               if (ret == 0) {
+                       break;
                }
        }
-       /* unless all nodes are STOPPED, when we pick one anyway */
-       if (i == nodemap->num) {
-               for(i=0;i<nodemap->num;i++){
-                       if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
-                               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                                       return ret;
-                               }
-                               if (!(capabilities&CTDB_CAP_NATGW)) {
-                                       continue;
-                               }
-                               printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                               break;
-                       }
-               }
-               /* or if we still can not find any */
-               if (i == nodemap->num) {
-                       printf("-1 0.0.0.0\n");
-                       ret = 2; /* matches ENOENT */
-               }
+
+       if (options.machinereadable) {
+               printf(":Node:IP:\n");
+               printf(":%d:%s:\n", pnn, ip);
+       } else {
+               printf("%d %s\n", pnn, ip);
        }
 
        /* print the pruned list of nodes belonging to this natgw list */
+       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
+               DEBUG(DEBUG_NOTICE, ("Unable to get PNN from node %u\n", options.pnn));
+               /* This is actually harmless and will only result in
+                * the "this node" indication being missing
+                */
+               mypnn = -1;
+       }
+       if (options.machinereadable) {
+               control_status_header_machine();
+       } else {
+               printf("Number of nodes:%d\n", nodemap->num);
+       }
        for(i=0;i<nodemap->num;i++){
                if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                        continue;
                }
-               printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
-                       ctdb_addr_to_str(&nodemap->nodes[i].addr),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
+               if (options.machinereadable) {
+                       control_status_1_machine(mypnn, &(nodemap->nodes[i]));
+               } else {
+                       control_status_1_human(mypnn, &(nodemap->nodes[i]));
+               }
        }
 
+done:
+       ctdb_free_nodemap(nodemap);
        return ret;
 }
 
@@ -1264,19 +1539,163 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
                return -1;
        }
 
-       for (i=0;i<ips->num;i++) {
-               if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
-                       break;
-               }
-       }
-       if (i==ips->num) {
-               DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
-                       pnn, ctdb_addr_to_str(addr)));
-               talloc_free(tmp_ctx);
+       for (i=0;i<ips->num;i++) {
+               if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
+                       break;
+               }
+       }
+       if (i==ips->num) {
+               DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
+                       pnn, ctdb_addr_to_str(addr)));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ip.pnn  = pnn;
+       ip.addr = *addr;
+
+       data.dptr  = (uint8_t *)&ip;
+       data.dsize = sizeof(ip);
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+               nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
+       ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
+                                       nodes, 0,
+                                       LONGTIMELIMIT(),
+                                       false, data,
+                                       NULL, NULL,
+                                       NULL);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       /* update the recovery daemon so it now knows to expect the new
+          node assignment for this ip.
+       */
+       ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+/*
+  move/failover an ip address to a specific node
+ */
+static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       uint32_t pnn;
+       int ret, retries = 0;
+       ctdb_sock_addr addr;
+
+       if (argc < 2) {
+               usage();
+               return -1;
+       }
+
+       if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
+               DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
+               return -1;
+       }
+
+
+       if (sscanf(argv[1], "%u", &pnn) != 1) {
+               DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
+               return -1;
+       }
+
+       do {
+               ret = move_ip(ctdb, &addr, pnn);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
+                       sleep(3);
+                       retries++;
+               }
+       } while (retries < 5 && ret != 0);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
+               return -1;
+       }
+
+       return 0;
+}
+
+static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
+{
+       uint32_t recmaster;
+       TDB_DATA data;
+
+       if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
+               return -1;
+       }
+
+       data.dptr  = (uint8_t *)&pnn;
+       data.dsize = sizeof(uint32_t);
+       if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/*
+  rebalance a node by setting it to allow failback and triggering a
+  takeover run
+ */
+static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       switch (options.pnn) {
+       case CTDB_BROADCAST_ALL:
+       case CTDB_CURRENT_NODE:
+               DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
+               return -1;
+       }
+
+       return rebalance_node(ctdb, options.pnn);
+}
+
+
+static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
+{
+       struct ctdb_public_ip ip;
+       int ret;
+       uint32_t *nodes;
+       uint32_t disable_time;
+       TDB_DATA data;
+       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+       disable_time = 30;
+       data.dptr  = (uint8_t*)&disable_time;
+       data.dsize = sizeof(disable_time);
+       ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
                return -1;
        }
 
-       ip.pnn  = pnn;
+       ip.pnn  = -1;
        ip.addr = *addr;
 
        data.dptr  = (uint8_t *)&ip;
@@ -1289,7 +1708,7 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
                return ret;
        }
 
-               nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
+               nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
        ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
                                        nodes, 0,
                                        LONGTIMELIMIT(),
@@ -1302,36 +1721,18 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
                return -1;
        }
 
-       ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       /* update the recovery daemon so it now knows to expect the new
-          node assignment for this ip.
-       */
-       ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
-               return -1;
-       }
-
        talloc_free(tmp_ctx);
        return 0;
 }
 
 /*
-  move/failover an ip address to a specific node
+  release an ip form all nodes and have it re-assigned by recd
  */
-static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
+static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       uint32_t pnn;
-       int ret, retries = 0;
        ctdb_sock_addr addr;
 
-       if (argc < 2) {
+       if (argc < 1) {
                usage();
                return -1;
        }
@@ -1341,29 +1742,15 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-
-       if (sscanf(argv[1], "%u", &pnn) != 1) {
-               DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
-               return -1;
-       }
-
-       do {
-               ret = move_ip(ctdb, &addr, pnn);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
-                       sleep(3);
-                       retries++;
-               }
-       } while (retries < 5 && ret != 0);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
+       if (rebalance_ip(ctdb, &addr) != 0) {
+               DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
                return -1;
        }
 
        return 0;
 }
 
-void getips_store_callback(void *param, void *data)
+static int getips_store_callback(void *param, void *data)
 {
        struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
        struct ctdb_all_public_ips *ips = param;
@@ -1372,13 +1759,15 @@ void getips_store_callback(void *param, void *data)
        i = ips->num++;
        ips->ips[i].pnn  = node_ip->pnn;
        ips->ips[i].addr = node_ip->addr;
+       return 0;
 }
 
-void getips_count_callback(void *param, void *data)
+static int getips_count_callback(void *param, void *data)
 {
        uint32_t *count = param;
 
        (*count)++;
+       return 0;
 }
 
 #define IP_KEYLEN      4
@@ -1392,12 +1781,14 @@ static uint32_t *ip_key(ctdb_sock_addr *ip)
        case AF_INET:
                key[0]  = ip->ip.sin_addr.s_addr;
                break;
-       case AF_INET6:
-               key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
-               key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
-               key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
-               key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
+       case AF_INET6: {
+               uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
+               key[0]  = s6_a32[3];
+               key[1]  = s6_a32[2];
+               key[2]  = s6_a32[1];
+               key[3]  = s6_a32[0];
                break;
+       }
        default:
                DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
                return key;
@@ -1726,17 +2117,8 @@ static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
                return ret;
        }
 
-       do {
-               ret = control_ipreallocate(ctdb, argc, argv);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u. Wait 3 seconds and try again.\n", options.pnn));
-                       sleep(3);
-                       retries++;
-               }
-       } while (retries < 5 && ret != 0);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u. Giving up.\n", options.pnn));
-               talloc_free(tmp_ctx);
+       if (rebalance_node(ctdb, options.pnn) != 0) {
+               DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
                return ret;
        }
 
@@ -1744,6 +2126,27 @@ static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
        return 0;
 }
 
+/*
+  add a public ip address to a node
+ */
+static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       ctdb_sock_addr addr;
+
+       if (argc != 1) {
+               usage();
+       }
+
+       if (!parse_ip(argv[0], NULL, 0, &addr)) {
+               printf("Badly formed ip : %s\n", argv[0]);
+               return -1;
+       }
+
+       printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
+
+       return 0;
+}
+
 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
 
 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
@@ -2056,6 +2459,45 @@ static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
        return -1;
 }
 
+/*
+  check if a server id exists
+ */
+static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       uint64_t *ids;
+       uint8_t *result;
+       int i;
+
+       if (argc < 1) {
+               talloc_free(tmp_ctx);
+               usage();
+       }
+
+       ids    = talloc_array(tmp_ctx, uint64_t, argc);
+       result = talloc_array(tmp_ctx, uint8_t, argc);
+
+       for (i = 0; i < argc; i++) {
+               ids[i] = strtoull(argv[i], NULL, 0);
+       }
+
+       if (!ctdb_check_message_handlers(ctdb_connection,
+               options.pnn, argc, ids, result)) {
+               DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
+                                 options.pnn));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       for (i=0; i < argc; i++) {
+               printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
+                      result[i] ? "exists" : "does not exist");
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   send a tcp tickle ack
  */
@@ -2263,18 +2705,14 @@ static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv
  */
 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_control_get_ifaces *ifaces;
+       int i;
+       struct ctdb_ifaces_list *ifaces;
 
        /* read the public ip list from this node */
-       ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn,
-                                  tmp_ctx, &ifaces);
-       if (ret != 0) {
+       if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
                DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
                                  options.pnn));
-               talloc_free(tmp_ctx);
-               return ret;
+               return -1;
        }
 
        if (options.machinereadable){
@@ -2297,7 +2735,7 @@ static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv
                }
        }
 
-       talloc_free(tmp_ctx);
+       ctdb_free_ifaces(ifaces);
        return 0;
 }
 
@@ -2760,12 +3198,10 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t capabilities;
-       int ret;
 
-       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
-       if (ret != 0) {
+       if (!ctdb_getcapabilities(ctdb_connection, options.pnn, &capabilities)) {
                DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
        
        if (!options.machinereadable){
@@ -2794,15 +3230,16 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
        int i, ret;
        int healthy_count = 0;
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
 
        capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
        CTDB_NO_MEMORY(ctdb, capabilities);
        
+       ret = 0;
+
        /* collect capabilities for all connected nodes */
        for (i=0; i<nodemap->num; i++) {
                if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
@@ -2812,10 +3249,10 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
                        continue;
                }
        
-               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
-               if (ret != 0) {
+               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
                        DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
-                       return ret;
+                       ret = -1;
+                       goto done;
                }
 
                if (!(capabilities[i] & CTDB_CAP_LVS)) {
@@ -2849,7 +3286,9 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
                        ctdb_addr_to_str(&nodemap->nodes[i].addr));
        }
 
-       return 0;
+done:
+       ctdb_free_nodemap(nodemap);
+       return ret;
 }
 
 /*
@@ -2862,14 +3301,15 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
        int i, ret;
        int healthy_count = 0;
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
 
        capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
        CTDB_NO_MEMORY(ctdb, capabilities);
+
+       ret = -1;
        
        /* collect capabilities for all connected nodes */
        for (i=0; i<nodemap->num; i++) {
@@ -2880,10 +3320,10 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
                        continue;
                }
        
-               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
-               if (ret != 0) {
+               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
                        DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
-                       return ret;
+                       ret = -1;
+                       goto done;
                }
 
                if (!(capabilities[i] & CTDB_CAP_LVS)) {
@@ -2918,11 +3358,14 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
                } else {
                        printf("Node %d is LVS master\n", i);
                }
-               return 0;
+               ret = 0;
+               goto done;
        }
 
        printf("There is no LVS master\n");
-       return -1;
+done:
+       ctdb_free_nodemap(nodemap);
+       return ret;
 }
 
 /*
@@ -2970,6 +3413,7 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
        struct ctdb_db_context *ctdb_db;
        int ret;
        bool persistent;
+       struct ctdb_dump_db_context c;
 
        if (argc < 1) {
                usage();
@@ -2983,15 +3427,33 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
 
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
        }
 
+       if (options.printlmaster) {
+               ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
+                                         ctdb, &ctdb->vnn_map);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
+                                         options.pnn));
+                       return ret;
+               }
+       }
+
+       ZERO_STRUCT(c);
+       c.f = stdout;
+       c.printemptyrecords = (bool)options.printemptyrecords;
+       c.printdatasize = (bool)options.printdatasize;
+       c.printlmaster = (bool)options.printlmaster;
+       c.printhash = (bool)options.printhash;
+       c.printrecordflags = (bool)options.printrecordflags;
+
        /* traverse and dump the cluster tdb */
-       ret = ctdb_dump_db(ctdb_db, stdout);
+       ret = ctdb_dump_db(ctdb_db, &c);
        if (ret == -1) {
                DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
                DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
@@ -3005,6 +3467,71 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
        return 0;
 }
 
+struct cattdb_data {
+       struct ctdb_context *ctdb;
+       uint32_t count;
+};
+
+static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
+{
+       struct cattdb_data *d = private_data;
+       struct ctdb_dump_db_context c;
+
+       d->count++;
+
+       ZERO_STRUCT(c);
+       c.f = stdout;
+       c.printemptyrecords = (bool)options.printemptyrecords;
+       c.printdatasize = (bool)options.printdatasize;
+       c.printlmaster = false;
+       c.printhash = (bool)options.printhash;
+       c.printrecordflags = true;
+
+       return ctdb_dumpdb_record(d->ctdb, key, data, &c);
+}
+
+/*
+  cat the local tdb database using same format as catdb
+ */
+static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       const char *db_name;
+       struct ctdb_db_context *ctdb_db;
+       struct cattdb_data d;
+       bool persistent;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       db_name = argv[0];
+
+
+       if (db_exists(ctdb, db_name, &persistent)) {
+               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
+
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
+               return -1;
+       }
+
+       /* traverse the local tdb */
+       d.count = 0;
+       d.ctdb  = ctdb;
+       if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
+               printf("Failed to cattdb data\n");
+               exit(10);
+       }
+       talloc_free(ctdb_db);
+
+       printf("Dumped %d records\n", d.count);
+       return 0;
+}
+
 /*
   display the content of a database key
  */
@@ -3023,13 +3550,12 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
 
        db_name = argv[0];
 
-
        if (db_exists(ctdb, db_name, &persistent)) {
                DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
 
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
@@ -3038,7 +3564,7 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
 
        key.dptr  = discard_const(argv[1]);
        key.dsize = strlen((char *)key.dptr);
+
        h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
        if (h == NULL) {
                printf("Failed to fetch record '%s' on node %d\n", 
@@ -3072,13 +3598,12 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
 
        db_name = argv[0];
 
-
        if (db_exists(ctdb, db_name, &persistent)) {
                DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
 
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
@@ -3087,7 +3612,7 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
 
        key.dptr  = discard_const(argv[1]);
        key.dsize = strlen((char *)key.dptr);
+
        h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
        if (h == NULL) {
                printf("Failed to fetch record '%s' on node %d\n", 
@@ -3098,7 +3623,7 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
 
        data.dptr  = discard_const(argv[2]);
        data.dsize = strlen((char *)data.dptr);
+
        if (ctdb_record_store(h, data) != 0) {
                printf("Failed to store record\n");
        }
@@ -3142,7 +3667,7 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
 
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
@@ -3166,78 +3691,157 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       if (data.dsize == 0 || data.dptr == NULL) {
-               DEBUG(DEBUG_ERR,("Record is empty\n"));
-               talloc_free(tmp_ctx);
+       if (data.dsize == 0 || data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,("Record is empty\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       if (argc == 3) {
+         fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
+               if (fd == -1) {
+                       DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               write(fd, data.dptr, data.dsize);
+               close(fd);
+       } else {
+               write(1, data.dptr, data.dsize);
+       }
+
+       /* abort the transaction */
+       talloc_free(h);
+
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+/*
+  fetch a record from a tdb-file
+ */
+static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       const char *tdb_file;
+       TDB_CONTEXT *tdb;
+       TDB_DATA key, data;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       int fd;
+
+       if (argc < 2) {
+               usage();
+       }
+
+       tdb_file = argv[0];
+
+       tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
+       if (tdb == NULL) {
+               printf("Failed to open TDB file %s\n", tdb_file);
+               return -1;
+       }
+
+       if (!strncmp(argv[1], "0x", 2)) {
+               key = hextodata(tmp_ctx, argv[1] + 2);
+               if (key.dsize == 0) {
+                       printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
+                       return -1;
+               }
+       } else {
+               key.dptr  = discard_const(argv[1]);
+               key.dsize = strlen(argv[1]);
+       }
+
+       data = tdb_fetch(tdb, key);
+       if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
+               tdb_close(tdb);
                return -1;
        }
 
+       tdb_close(tdb);
+
        if (argc == 3) {
          fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
                if (fd == -1) {
-                       DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
-                       talloc_free(tmp_ctx);
+                       printf("Failed to open output file %s\n", argv[2]);
                        return -1;
                }
-               write(fd, data.dptr, data.dsize);
+               if (options.verbose){
+                       write(fd, data.dptr, data.dsize);
+               } else {
+                       write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+               }
                close(fd);
        } else {
-               write(1, data.dptr, data.dsize);
+               if (options.verbose){
+                       write(1, data.dptr, data.dsize);
+               } else {
+                       write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+               }
        }
 
-       /* abort the transaction */
-       talloc_free(h);
-
-
        talloc_free(tmp_ctx);
        return 0;
 }
 
 /*
-  fetch a record from a tdb-file
+  store a record and header to a tdb-file
  */
-static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
+static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        const char *tdb_file;
        TDB_CONTEXT *tdb;
        TDB_DATA key, data;
-       int fd;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
 
-       if (argc < 2) {
+       if (argc < 3) {
                usage();
        }
 
        tdb_file = argv[0];
 
-       tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
+       tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
        if (tdb == NULL) {
-               DEBUG(DEBUG_ERR,("Failed to open TDB file %s\n", tdb_file));
+               printf("Failed to open TDB file %s\n", tdb_file);
                return -1;
        }
 
-       key.dptr  = discard_const(argv[1]);
-       key.dsize = strlen(argv[1]);
-       data = tdb_fetch(tdb, key);
-       if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_ERR,("Failed to read record %s from tdb %s\n", argv[1], tdb_file));
-               tdb_close(tdb);
-               return -1;
+       if (!strncmp(argv[1], "0x", 2)) {
+               key = hextodata(tmp_ctx, argv[1] + 2);
+               if (key.dsize == 0) {
+                       printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
+                       return -1;
+               }
+       } else {
+               key.dptr  = discard_const(argv[1]);
+               key.dsize = strlen(argv[1]);
        }
 
-       tdb_close(tdb);
-
-       if (argc == 3) {
-         fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
-               if (fd == -1) {
-                       DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
+       if (!strncmp(argv[2], "0x", 2)) {
+               data = hextodata(tmp_ctx, argv[2] + 2);
+               if (data.dsize == 0) {
+                       printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
                        return -1;
                }
-               write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
-               close(fd);
        } else {
-               write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+               data.dptr  = discard_const(argv[2]);
+               data.dsize = strlen(argv[2]);
+       }
+
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
+               return -1;
        }
+       if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
+               printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
+               tdb_close(tdb);
+               return -1;
+       }
+
+       tdb_close(tdb);
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -3305,8 +3909,7 @@ static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv
 
        db_name = argv[0];
 
-       ctdb_db = ctdb_attach(ctdb, db_name, true, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                talloc_free(tmp_ctx);
@@ -3475,6 +4078,96 @@ static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **ar
 }
 
 
+static uint32_t reloadips_finished;
+
+static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       reloadips_finished = 1;
+}
+
+static int reloadips_all(struct ctdb_context *ctdb)
+{
+       struct reloadips_all_reply rips;
+       struct ctdb_node_map *nodemap=NULL;
+       TDB_DATA data;
+       uint32_t recmaster;
+       int ret, i;
+
+       /* check that there are valid nodes available */
+       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
+               return 1;
+       }
+       for (i=0; i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags != 0) {
+                       DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
+                       return 1;
+               }
+       }
+
+
+       rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       if (rips.pnn == -1) {
+               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
+               return 1;
+       }
+       rips.srvid = getpid();
+
+
+       /* register a message port for receiveing the reply so that we
+          can receive the reply
+       */
+       ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
+
+       if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
+               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
+               return -1;
+       }
+
+
+       data.dptr = (uint8_t *)&rips;
+       data.dsize = sizeof(rips);
+
+       ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
+               return 1;
+       }
+
+       reloadips_finished = 0;
+       while (reloadips_finished == 0) {
+               event_loop_once(ctdb->ev);
+       }
+
+       return 0;
+}
+
+/*
+  reload public ips on a specific node
+ */
+static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int ret;
+       int32_t res;
+       char *errmsg;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+       if (options.pnn == CTDB_BROADCAST_ALL) {
+               return reloadips_all(ctdb);
+       }
+
+       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
+                          0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
+       if (ret != 0 || res != 0) {
+               DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
 
 /*
   display a list of the databases on a remote ctdb
@@ -3491,12 +4184,14 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        if(options.machinereadable){
-               printf(":ID:Name:Path:Persistent:Unhealthy:\n");
+               printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
                for(i=0;i<dbmap->num;i++){
                        const char *path;
                        const char *name;
                        const char *health;
                        bool persistent;
+                       bool readonly;
+                       bool sticky;
 
                        ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
                                            dbmap->dbs[i].dbid, ctdb, &path);
@@ -3504,10 +4199,13 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                                            dbmap->dbs[i].dbid, ctdb, &name);
                        ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
                                              dbmap->dbs[i].dbid, ctdb, &health);
-                       persistent = dbmap->dbs[i].persistent;
-                       printf(":0x%08X:%s:%s:%d:%d:\n",
+                       persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+                       readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
+                       sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
+                       printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
                               dbmap->dbs[i].dbid, name, path,
-                              !!(persistent), !!(health));
+                              !!(persistent), !!(sticky),
+                              !!(health), !!(readonly));
                }
                return 0;
        }
@@ -3518,14 +4216,20 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                const char *name;
                const char *health;
                bool persistent;
+               bool readonly;
+               bool sticky;
 
                ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
                ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
                ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
-               persistent = dbmap->dbs[i].persistent;
-               printf("dbid:0x%08x name:%s path:%s%s%s\n",
+               persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+               readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
+               sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
+               printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
                       dbmap->dbs[i].dbid, name, path,
                       persistent?" PERSISTENT":"",
+                      sticky?" STICKY":"",
+                      readonly?" READONLY":"",
                       health?" UNHEALTHY":"");
        }
 
@@ -3558,6 +4262,8 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
                const char *name;
                const char *health;
                bool persistent;
+               bool readonly;
+               bool sticky;
 
                ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
                if (strcmp(name, db_name) != 0) {
@@ -3566,10 +4272,14 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
 
                ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
                ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
-               persistent = dbmap->dbs[i].persistent;
-               printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nHEALTH: %s\n",
+               persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+               readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
+               sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
+               printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
                       dbmap->dbs[i].dbid, name, path,
                       persistent?"yes":"no",
+                      sticky?"yes":"no",
+                      readonly?"yes":"no",
                       health?health:"OK");
                return 0;
        }
@@ -3646,7 +4356,7 @@ static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       printf("%-19s = %u\n", name, value);
+       printf("%-23s = %u\n", name, value);
        return 0;
 }
 
@@ -3950,7 +4660,7 @@ static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv
                persistent = true;
        }
 
-       ctdb_db = ctdb_attach(ctdb, db_name, persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -4008,6 +4718,136 @@ static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **a
        return 0;
 }
 
+/*
+  set the sticky records capability for a database
+ */
+static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t db_id;
+       struct ctdb_dbid_map *dbmap=NULL;
+       int i, ret;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       if (!strncmp(argv[0], "0x", 2)) {
+               db_id = strtoul(argv[0] + 2, NULL, 0);
+       } else {
+               ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+               for(i=0;i<dbmap->num;i++){
+                       const char *name;
+
+                       ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+                       if(!strcmp(argv[0], name)){
+                               talloc_free(discard_const(name));
+                               break;
+                       }
+                       talloc_free(discard_const(name));
+               }
+               if (i == dbmap->num) {
+                       DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               db_id = dbmap->dbs[i].dbid;
+       }
+
+       ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+/*
+  set the readonly capability for a database
+ */
+static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t db_id;
+       struct ctdb_dbid_map *dbmap=NULL;
+       int i, ret;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       if (!strncmp(argv[0], "0x", 2)) {
+               db_id = strtoul(argv[0] + 2, NULL, 0);
+       } else {
+               ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+               for(i=0;i<dbmap->num;i++){
+                       const char *name;
+
+                       ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+                       if(!strcmp(argv[0], name)){
+                               talloc_free(discard_const(name));
+                               break;
+                       }
+                       talloc_free(discard_const(name));
+               }
+               if (i == dbmap->num) {
+                       DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               db_id = dbmap->dbs[i].dbid;
+       }
+
+       ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+/*
+  get db seqnum
+ */
+static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       bool ret;
+       uint32_t db_id;
+       uint64_t seqnum;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       db_id = strtoul(argv[0], NULL, 0);
+
+       ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
+       if (!ret) {
+               DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
+               return -1;
+       }
+
+       printf("Sequence number:%lld\n", (long long)seqnum);
+
+       return 0;
+}
+
 /*
   run an eventscript on a node
  */
@@ -4158,7 +4998,7 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
                                     allow_unhealthy));
        }
 
-       ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
                talloc_free(tmp_ctx);
@@ -4210,7 +5050,7 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
 
        dbhdr.version = DB_VERSION;
        dbhdr.timestamp = time(NULL);
-       dbhdr.persistent = dbmap->dbs[i].persistent;
+       dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
        dbhdr.size = bd->len;
        if (strlen(argv[0]) >= MAX_DB_NAME) {
                DEBUG(DEBUG_ERR,("Too long dbname\n"));
@@ -4237,7 +5077,7 @@ done:
                }
        }
 
-       DEBUG(DEBUG_ERR,("Database acked up to %s\n", argv[1]));
+       DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
 
        talloc_free(tmp_ctx);
        return status;
@@ -4305,7 +5145,7 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
                dbname, tbuf);
 
 
-       ctdb_db = ctdb_attach(ctdb, dbname, dbhdr.persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
                talloc_free(tmp_ctx);
@@ -4455,6 +5295,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
        char tbuf[100];
        struct ctdb_rec_data *rec = NULL;
        struct ctdb_marshall_buffer *m;
+       struct ctdb_dump_db_context c;
 
        if (argc != 1) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
@@ -4492,6 +5333,14 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
        printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
                dbhdr.name, m->db_id, tbuf);
 
+       ZERO_STRUCT(c);
+       c.f = stdout;
+       c.printemptyrecords = (bool)options.printemptyrecords;
+       c.printdatasize = (bool)options.printdatasize;
+       c.printlmaster = false;
+       c.printhash = (bool)options.printhash;
+       c.printrecordflags = (bool)options.printrecordflags;
+
        for (i=0; i < m->count; i++) {
                uint32_t reqid = 0;
                TDB_DATA key, data;
@@ -4500,7 +5349,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
                rec = ctdb_marshall_loop_next(m, rec, &reqid,
                                              NULL, &key, &data);
 
-               ctdb_dumpdb_record(ctdb, key, data, stdout);
+               ctdb_dumpdb_record(ctdb, key, data, &c);
        }
 
        printf("Dumped %d records\n", i);
@@ -4557,7 +5406,7 @@ static int control_wipedb(struct ctdb_context *ctdb, int argc,
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, argv[0], dbmap->dbs[i].persistent, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
                                  argv[0]));
@@ -4932,7 +5781,8 @@ static const struct {
        { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
        { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
        { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
-       { "catdb",           control_catdb,             true,   false,  "dump a database" ,                     "<dbname>"},
+       { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
+       { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
        { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
        { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
        { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
@@ -4973,11 +5823,13 @@ static const struct {
        { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
        { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
        { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
-       { "vacuum",          ctdb_vacuum,               false,  false, "vacuum the databases of empty records", "[max_records]"},
+       { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
+       { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
        { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
        { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
        { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
        { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
+       { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
        { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
        { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
        { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
@@ -4985,8 +5837,8 @@ static const struct {
        { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
        { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
        { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
-       { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
-       { "scriptstatus",    control_scriptstatus,  false,      false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
+       { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
+       { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
        { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
        { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
        { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
@@ -4998,15 +5850,24 @@ static const struct {
        { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
        { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
        { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
+       { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>|<name>"},
+       { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbid>|<name>"},
        { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
        { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
        { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
        { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
        { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
-       { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file", "<tdb-file> <key> [<file>]" },
+       { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
+       { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
        { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
        { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
        { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
+       { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
+       { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
+       { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
+       { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
+       { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
+       { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
 };
 
 /*
@@ -5054,6 +5915,11 @@ int main(int argc, const char *argv[])
                { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
                { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
                { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
+               { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
+               { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
+               { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
+               { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
+               { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
                POPT_TABLEEND
        };
        int opt;
@@ -5063,6 +5929,7 @@ int main(int argc, const char *argv[])
        poptContext pc;
        struct event_context *ev;
        const char *control;
+       const char *socket_name;
 
        setlinebuf(stdout);
        
@@ -5107,15 +5974,6 @@ int main(int argc, const char *argv[])
        signal(SIGALRM, ctdb_alarm);
        alarm(options.maxruntime);
 
-       /* setup the node number to contact */
-       if (nodestring != NULL) {
-               if (strcmp(nodestring, "all") == 0) {
-                       options.pnn = CTDB_BROADCAST_ALL;
-               } else {
-                       options.pnn = strtoul(nodestring, NULL, 0);
-               }
-       }
-
        control = extra_argv[0];
 
        ev = event_context_init(NULL);
@@ -5123,74 +5981,73 @@ int main(int argc, const char *argv[])
                DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
                exit(1);
        }
-       tevent_loop_allow_nesting(ev);
 
        for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
                if (strcmp(control, ctdb_commands[i].name) == 0) {
-                       int j;
+                       break;
+               }
+       }
 
-                       if (ctdb_commands[i].without_daemon == true) {
-                               close(2);
-                       }
+       if (i == ARRAY_SIZE(ctdb_commands)) {
+               DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
+               exit(1);
+       }
+
+       if (ctdb_commands[i].without_daemon == true) {
+               if (nodestring != NULL) {
+                       DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
+                       exit(1);
+               }
+               close(2);
+               return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
+       }
 
-                       /* initialise ctdb */
-                       ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
+       /* initialise ctdb */
+       ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
 
-                       if (ctdb_commands[i].without_daemon == false) {
-                               const char *socket_name;
+       if (ctdb == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
+               exit(1);
+       }
 
-                               if (ctdb == NULL) {
-                                       DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
-                                       exit(1);
-                               }
+       /* initialize a libctdb connection as well */
+       socket_name = ctdb_get_socketname(ctdb);
+       ctdb_connection = ctdb_connect(socket_name,
+                                      ctdb_log_file, stderr);
+       if (ctdb_connection == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
+               exit(1);
+       }                               
 
-                               /* initialize a libctdb connection as well */
-                               socket_name = ctdb_get_socketname(ctdb);
-                               ctdb_connection = ctdb_connect(socket_name,
-                                                      ctdb_log_file, stderr);
-                               if (ctdb_connection == NULL) {
-                                       fprintf(stderr, "Failed to connect to daemon from libctdb\n");
-                                       exit(1);
-                               }                               
-                       
-                               /* verify the node exists */
-                               verify_node(ctdb);
-
-                               if (options.pnn == CTDB_CURRENT_NODE) {
-                                       int pnn;
-                                       pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);         
-                                       if (pnn == -1) {
-                                               return -1;
-                                       }
-                                       options.pnn = pnn;
-                               }
-                       }
+       /* setup the node number(s) to contact */
+       if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
+                             &options.nodes, &options.pnn)) {
+               usage();
+       }
 
-                       if (ctdb_commands[i].auto_all && 
-                           options.pnn == CTDB_BROADCAST_ALL) {
-                               uint32_t *nodes;
-                               uint32_t num_nodes;
-                               ret = 0;
+       if (options.pnn == CTDB_CURRENT_NODE) {
+               options.pnn = options.nodes[0];
+       }
 
-                               nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
-                               CTDB_NO_MEMORY(ctdb, nodes);
-       
-                               for (j=0;j<num_nodes;j++) {
-                                       options.pnn = nodes[j];
-                                       ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
-                               }
-                               talloc_free(nodes);
-                       } else {
-                               ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
-                       }
-                       break;
+       if (ctdb_commands[i].auto_all && 
+           ((options.pnn == CTDB_BROADCAST_ALL) ||
+            (options.pnn == CTDB_MULTICAST))) {
+               int j;
+
+               ret = 0;
+               for (j = 0; j < talloc_array_length(options.nodes); j++) {
+                       options.pnn = options.nodes[j];
+                       ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
                }
+       } else {
+               ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
        }
 
-       if (i == ARRAY_SIZE(ctdb_commands)) {
-               DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
-               exit(1);
-       }
+       ctdb_disconnect(ctdb_connection);
+       talloc_free(ctdb);
+       talloc_free(ev);
+       (void)poptFreeContext(pc);
 
        return ret;
+
 }