libndr: Avoid assigning duplicate versions to symbols
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
index 052769163a13949e7dabaa57eab6df51e4c46df0..146b7da344c62fd41da11e21ba67d23a0fcc81cf 100644 (file)
@@ -20,6 +20,7 @@
 #include "replace.h"
 #include "system/network.h"
 #include "system/time.h"
+#include "system/filesys.h"
 
 #include <popt.h>
 #include <talloc.h>
 
 #include "protocol/protocol.h"
 #include "protocol/protocol_api.h"
+#include "protocol/protocol_util.h"
+#include "protocol/protocol_private.h"
 
 #include "common/comm.h"
-#include "common/system.h"
 #include "common/logging.h"
 #include "common/tunable.h"
+#include "common/srvid.h"
+#include "common/system.h"
+
+#include "ipalloc_read_known_ips.h"
 
 
 #define CTDB_PORT 4379
@@ -80,26 +86,54 @@ struct vnn_map {
        uint32_t *map;
 };
 
-struct srvid_register_state {
-       struct srvid_register_state *prev, *next;
+struct database {
+       struct database *prev, *next;
+       const char *name;
+       const char *path;
+       struct tdb_context *tdb;
+       uint32_t id;
+       uint8_t flags;
+       uint64_t seq_num;
+};
+
+struct database_map {
+       struct database *db;
+       const char *dbdir;
+};
+
+struct fake_control_failure {
+       struct fake_control_failure  *prev, *next;
+       enum ctdb_controls opcode;
+       uint32_t pnn;
+       const char *error;
+       const char *comment;
+};
+
+struct ctdb_client {
+       struct ctdb_client *prev, *next;
        struct ctdbd_context *ctdb;
-       uint64_t srvid;
+       pid_t pid;
+       void *state;
 };
 
 struct ctdbd_context {
        struct node_map *node_map;
        struct interface_map *iface_map;
        struct vnn_map *vnn_map;
-       struct srvid_register_state *rstate;
+       struct database_map *db_map;
+       struct srvid_context *srv;
        int num_clients;
        struct timeval start_time;
        struct timeval recovery_start_time;
        struct timeval recovery_end_time;
        bool takeover_disabled;
-       enum debug_level log_level;
+       int log_level;
        enum ctdb_runstate runstate;
        struct ctdb_tunable_list tun_list;
-       int monitoring_mode;
+       char *reclock;
+       struct ctdb_public_ip_list *known_ips;
+       struct fake_control_failure *control_failures;
+       struct ctdb_client *client_list;
 };
 
 /*
@@ -141,6 +175,7 @@ static bool nodemap_parse(struct node_map *node_map)
                char *ip;
                ctdb_sock_addr saddr;
                struct node *node;
+               int ret;
 
                if (line[0] == '\n') {
                        break;
@@ -165,10 +200,12 @@ static bool nodemap_parse(struct node_map *node_map)
                        fprintf(stderr, "bad line (%s) - missing IP\n", line);
                        continue;
                }
-               if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
+               ret = ctdb_sock_addr_from_string(tok, &saddr, false);
+               if (ret != 0) {
                        fprintf(stderr, "bad line (%s) - invalid IP\n", line);
                        continue;
                }
+               ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
                ip = talloc_strdup(node_map, tok);
                if (ip == NULL) {
                        goto fail;
@@ -220,7 +257,12 @@ static bool nodemap_parse(struct node_map *node_map)
                }
                node = &node_map->node[node_map->num_nodes];
 
-               parse_ip(ip, NULL, CTDB_PORT, &node->addr);
+               ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
+               if (ret != 0) {
+                       fprintf(stderr, "bad line (%s) - invalid IP\n", line);
+                       continue;
+               }
+               ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
                node->pnn = pnn;
                node->flags = flags;
                node->capabilities = capabilities;
@@ -246,11 +288,14 @@ static bool node_map_add(struct ctdb_node_map *nodemap,
        ctdb_sock_addr addr;
        uint32_t num;
        struct ctdb_node_and_flags *n;
+       int ret;
 
-       if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
+       ret = ctdb_sock_addr_from_string(nstr, &addr, false);
+       if (ret != 0) {
                fprintf(stderr, "Invalid IP address %s\n", nstr);
                return false;
        }
+       ctdb_sock_addr_set_port(&addr, CTDB_PORT);
 
        num = nodemap->num;
        nodemap->node = talloc_realloc(nodemap, nodemap->node,
@@ -341,28 +386,49 @@ static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
                                             uint32_t pnn)
 {
        struct ctdb_node_map *nodemap;
-       char nodepath[PATH_MAX];
-       const char *nodes_list;
-
-       /* read the nodes file */
-       sprintf(nodepath, "CTDB_NODES_%u", pnn);
-       nodes_list = getenv(nodepath);
-       if (nodes_list == NULL) {
-               nodes_list = getenv("CTDB_NODES");
-               if (nodes_list == NULL) {
-                       DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
+       char nodes_list[PATH_MAX];
+       const char *ctdb_base;
+       int num;
+
+       ctdb_base = getenv("CTDB_BASE");
+       if (ctdb_base == NULL) {
+               D_ERR("CTDB_BASE is not set\n");
+               return NULL;
+       }
+
+       /* read optional node-specific nodes file */
+       num = snprintf(nodes_list, sizeof(nodes_list),
+                      "%s/nodes.%d", ctdb_base, pnn);
+       if (num == sizeof(nodes_list)) {
+               D_ERR("nodes file path too long\n");
+               return NULL;
+       }
+       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+       if (nodemap != NULL) {
+               /* Fake a load failure for an empty nodemap */
+               if (nodemap->num == 0) {
+                       talloc_free(nodemap);
+
+                       D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
                        return NULL;
                }
+
+               return nodemap;
        }
 
-       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
-       if (nodemap == NULL) {
-               DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
-                                  nodes_list));
+       /* read normal nodes file */
+       num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
+       if (num == sizeof(nodes_list)) {
+               D_ERR("nodes file path too long\n");
                return NULL;
        }
+       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+       if (nodemap != NULL) {
+               return nodemap;
+       }
 
-       return nodemap;
+       DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
+       return NULL;
 }
 
 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
@@ -530,489 +596,1521 @@ fail:
        return false;
 }
 
-/*
- * CTDB context setup
- */
-
-static uint32_t new_generation(uint32_t old_generation)
+static bool reclock_parse(struct ctdbd_context *ctdb)
 {
-       uint32_t generation;
+       char line[1024];
+       char *t;
 
-       while (1) {
-               generation = random();
-               if (generation != INVALID_GENERATION &&
-                   generation != old_generation) {
-                       break;
-               }
+       if (fgets(line, sizeof(line), stdin) == NULL) {
+               goto fail;
        }
 
-       return generation;
+       if (line[0] == '\n') {
+               /* Recovery lock remains unset */
+               goto ok;
+       }
+
+       /* Get rid of pesky newline */
+       if ((t = strchr(line, '\n')) != NULL) {
+               *t = '\0';
+       }
+
+       ctdb->reclock = talloc_strdup(ctdb, line);
+       if (ctdb->reclock == NULL) {
+               goto fail;
+       }
+ok:
+       /* Swallow possible blank line following section.  Picky
+        * compiler settings don't allow the return value to be
+        * ignored, so make the compiler happy.
+        */
+       if (fgets(line, sizeof(line), stdin) == NULL) {
+               ;
+       }
+       DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
+       return true;
+
+fail:
+       fprintf(stderr, "Parsing reclock failed\n");
+       return false;
 }
 
-static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
+static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
+                                      const char *dbdir)
 {
-       struct ctdbd_context *ctdb;
-       char line[1024];
-       bool status;
+       struct database_map *db_map;
 
-       ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
-       if (ctdb == NULL) {
+       db_map = talloc_zero(mem_ctx, struct database_map);
+       if (db_map == NULL) {
                return NULL;
        }
 
-       ctdb->node_map = nodemap_init(ctdb);
-       if (ctdb->node_map == NULL) {
-               goto fail;
+       db_map->dbdir = talloc_strdup(db_map, dbdir);
+       if (db_map->dbdir == NULL) {
+               talloc_free(db_map);
+               return NULL;
        }
 
-       ctdb->iface_map = interfaces_init(ctdb);
-       if (ctdb->iface_map == NULL) {
-               goto fail;
-       }
+       return db_map;
+}
 
-       ctdb->vnn_map = vnnmap_init(ctdb);
-       if (ctdb->vnn_map == NULL) {
-               goto fail;
-       }
+/* Read a database map from stdin.  Each line looks like:
+ *  <ID> <NAME> [FLAGS] [SEQ_NUM]
+ * EOF or a blank line terminates input.
+ *
+ * By default, flags and seq_num are 0
+ */
 
-       while (fgets(line, sizeof(line), stdin) != NULL) {
-               char *t;
+static bool dbmap_parse(struct database_map *db_map)
+{
+       char line[1024];
+
+       while ((fgets(line, sizeof(line), stdin) != NULL)) {
+               uint32_t id;
+               uint8_t flags = 0;
+               uint32_t seq_num = 0;
+               char *tok, *t;
+               char *name;
+               struct database *db;
+
+               if (line[0] == '\n') {
+                       break;
+               }
 
+               /* Get rid of pesky newline */
                if ((t = strchr(line, '\n')) != NULL) {
                        *t = '\0';
                }
 
-               if (strcmp(line, "NODEMAP") == 0) {
-                       status = nodemap_parse(ctdb->node_map);
-               } else if (strcmp(line, "IFACES") == 0) {
-                       status = interfaces_parse(ctdb->iface_map);
-               } else if (strcmp(line, "VNNMAP") == 0) {
-                       status = vnnmap_parse(ctdb->vnn_map);
-               } else {
-                       fprintf(stderr, "Unknown line %s\n", line);
-                       status = false;
+               /* Get ID */
+               tok = strtok(line, " \t");
+               if (tok == NULL) {
+                       fprintf(stderr, "bad line (%s) - missing ID\n", line);
+                       continue;
                }
+               id = (uint32_t)strtoul(tok, NULL, 0);
 
-               if (! status) {
+               /* Get NAME */
+               tok = strtok(NULL, " \t");
+               if (tok == NULL) {
+                       fprintf(stderr, "bad line (%s) - missing NAME\n", line);
+                       continue;
+               }
+               name = talloc_strdup(db_map, tok);
+               if (name == NULL) {
                        goto fail;
                }
-       }
 
-       ctdb->start_time = tevent_timeval_current();
-       ctdb->recovery_start_time = tevent_timeval_current();
-       ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
-       if (ctdb->vnn_map->generation == INVALID_GENERATION) {
-               ctdb->vnn_map->generation =
-                       new_generation(ctdb->vnn_map->generation);
-       }
-       ctdb->recovery_end_time = tevent_timeval_current();
+               /* Get flags */
+               tok = strtok(NULL, " \t");
+               while (tok != NULL) {
+                       if (strcmp(tok, "PERSISTENT") == 0) {
+                               flags |= CTDB_DB_FLAGS_PERSISTENT;
+                       } else if (strcmp(tok, "STICKY") == 0) {
+                               flags |= CTDB_DB_FLAGS_STICKY;
+                       } else if (strcmp(tok, "READONLY") == 0) {
+                               flags |= CTDB_DB_FLAGS_READONLY;
+                       } else if (strcmp(tok, "REPLICATED") == 0) {
+                               flags |= CTDB_DB_FLAGS_REPLICATED;
+                       } else if (tok[0] >= '0'&& tok[0] <= '9') {
+                               uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
+                                            CTDB_DB_FLAGS_REPLICATED;
+
+                               if ((flags & nv) == 0) {
+                                       fprintf(stderr,
+                                               "seq_num for volatile db\n");
+                                       goto fail;
+                               }
+                               seq_num = (uint64_t)strtoull(tok, NULL, 0);
+                       }
 
-       ctdb->log_level = DEBUG_ERR;
-       ctdb->runstate = CTDB_RUNSTATE_RUNNING;
+                       tok = strtok(NULL, " \t");
+               }
 
-       ctdb_tunable_set_defaults(&ctdb->tun_list);
+               db = talloc_zero(db_map, struct database);
+               if (db == NULL) {
+                       goto fail;
+               }
+
+               db->id = id;
+               db->name = talloc_steal(db, name);
+               db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+               if (db->path == NULL) {
+                       talloc_free(db);
+                       goto fail;
+               }
+               db->flags = flags;
+               db->seq_num = seq_num;
 
-       ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
+               DLIST_ADD_END(db_map->db, db);
+       }
 
-       return ctdb;
+       DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
+       return true;
 
 fail:
-       TALLOC_FREE(ctdb);
-       return NULL;
+       DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
+       return false;
+
 }
 
-static bool ctdbd_verify(struct ctdbd_context *ctdb)
+static struct database *database_find(struct database_map *db_map,
+                                     uint32_t db_id)
 {
-       struct node *node;
-       int i;
+       struct database *db;
 
-       if (ctdb->node_map->num_nodes == 0) {
-               return true;
-       }
-
-       /* Make sure all the nodes are in order */
-       for (i=0; i<ctdb->node_map->num_nodes; i++) {
-               node = &ctdb->node_map->node[i];
-               if (node->pnn != i) {
-                       fprintf(stderr, "Expected node %u, found %u\n",
-                               i, node->pnn);
-                       return false;
+       for (db = db_map->db; db != NULL; db = db->next) {
+               if (db->id == db_id) {
+                       return db;
                }
        }
 
-       node = &ctdb->node_map->node[ctdb->node_map->pnn];
-       if (node->flags & NODE_FLAGS_DISCONNECTED) {
-               DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
-               exit(0);
+       return NULL;
+}
+
+static int database_count(struct database_map *db_map)
+{
+       struct database *db;
+       int count = 0;
+
+       for (db = db_map->db; db != NULL; db = db->next) {
+               count += 1;
        }
 
-       return true;
+       return count;
 }
 
-/*
- * Doing a recovery
- */
+static int database_flags(uint8_t db_flags)
+{
+       int tdb_flags = 0;
 
-struct recover_state {
-       struct tevent_context *ev;
-       struct ctdbd_context *ctdb;
-};
+       if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
+               tdb_flags = TDB_DEFAULT;
+       } else {
+               /* volatile and replicated use the same flags */
+               tdb_flags = TDB_NOSYNC |
+                           TDB_CLEAR_IF_FIRST |
+                           TDB_INCOMPATIBLE_HASH;
+       }
 
-static int recover_check(struct tevent_req *req);
-static void recover_wait_done(struct tevent_req *subreq);
-static void recover_done(struct tevent_req *subreq);
+       tdb_flags |= TDB_DISALLOW_NESTING;
 
-static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
-                                      struct tevent_context *ev,
-                                      struct ctdbd_context *ctdb)
+       return tdb_flags;
+}
+
+static struct database *database_new(struct database_map *db_map,
+                                    const char *name, uint8_t flags)
 {
-       struct tevent_req *req;
-       struct recover_state *state;
-       int ret;
+       struct database *db;
+       TDB_DATA key;
+       int tdb_flags;
 
-       req = tevent_req_create(mem_ctx, &state, struct recover_state);
-       if (req == NULL) {
+       db = talloc_zero(db_map, struct database);
+       if (db == NULL) {
                return NULL;
        }
 
-       state->ev = ev;
-       state->ctdb = ctdb;
+       db->name = talloc_strdup(db, name);
+       if (db->name == NULL) {
+               goto fail;
+       }
 
-       ret = recover_check(req);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+       db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+       if (db->path == NULL) {
+               goto fail;
        }
 
-       return req;
+       key.dsize = strlen(db->name) + 1;
+       key.dptr = discard_const(db->name);
+
+       db->id = tdb_jenkins_hash(&key);
+       db->flags = flags;
+
+       tdb_flags = database_flags(flags);
+
+       db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
+       if (db->tdb == NULL) {
+               DBG_ERR("tdb_open\n");
+               goto fail;
+       }
+
+       DLIST_ADD_END(db_map->db, db);
+       return db;
+
+fail:
+       DBG_ERR("Memory error\n");
+       talloc_free(db);
+       return NULL;
+
 }
 
-static int recover_check(struct tevent_req *req)
+static int ltdb_store(struct database *db, TDB_DATA key,
+                     struct ctdb_ltdb_header *header, TDB_DATA data)
 {
-       struct recover_state *state = tevent_req_data(
-               req, struct recover_state);
-       struct ctdbd_context *ctdb = state->ctdb;
-       struct tevent_req *subreq;
-       bool recovery_disabled;
-       int i;
+       int ret;
+       bool db_volatile = true;
+       bool keep = false;
 
-       recovery_disabled = false;
-       for (i=0; i<ctdb->node_map->num_nodes; i++) {
-               if (ctdb->node_map->node[i].recovery_disabled) {
-                       recovery_disabled = true;
-                       break;
-               }
+       if (db->tdb == NULL) {
+               return EINVAL;
        }
 
-       subreq = tevent_wakeup_send(state, state->ev,
-                                   tevent_timeval_current_ofs(1, 0));
-       if (subreq == NULL) {
-               return ENOMEM;
+       if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
+           (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
+               db_volatile = false;
        }
 
-       if (recovery_disabled) {
-               tevent_req_set_callback(subreq, recover_wait_done, req);
+       if (data.dsize > 0) {
+               keep = true;
        } else {
-               ctdb->recovery_start_time = tevent_timeval_current();
-               tevent_req_set_callback(subreq, recover_done, req);
+               if (db_volatile && header->rsn == 0) {
+                       keep = true;
+               }
        }
 
-       return 0;
+       if (keep) {
+               TDB_DATA rec[2];
+
+               rec[0].dsize = ctdb_ltdb_header_len(header);
+               rec[0].dptr = (uint8_t *)header;
+
+               rec[1].dsize = data.dsize;
+               rec[1].dptr = data.dptr;
+
+               ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
+       } else {
+               if (header->rsn > 0) {
+                       ret = tdb_delete(db->tdb, key);
+               } else {
+                       ret = 0;
+               }
+       }
+
+       return ret;
 }
 
-static void recover_wait_done(struct tevent_req *subreq)
+static int ltdb_fetch(struct database *db, TDB_DATA key,
+                     struct ctdb_ltdb_header *header,
+                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
 {
-       struct tevent_req *req = tevent_req_callback_data(
-               subreq, struct tevent_req);
+       TDB_DATA rec;
+       size_t np;
        int ret;
-       bool status;
 
-       status = tevent_wakeup_recv(subreq);
-       TALLOC_FREE(subreq);
-       if (! status) {
-               tevent_req_error(req, EIO);
-               return;
+       if (db->tdb == NULL) {
+               return EINVAL;
        }
 
-       ret = recover_check(req);
+       rec = tdb_fetch(db->tdb, key);
+       ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
        if (ret != 0) {
-               tevent_req_error(req, ret);
-       }
-}
+               if (rec.dptr != NULL) {
+                       free(rec.dptr);
+               }
 
-static void recover_done(struct tevent_req *subreq)
-{
+               *header = (struct ctdb_ltdb_header) {
+                       .rsn = 0,
+                       .dmaster = 0,
+                       .flags = 0,
+               };
+
+               ret = ltdb_store(db, key, header, tdb_null);
+               if (ret != 0) {
+                       return ret;
+               }
+
+               *data = tdb_null;
+               return 0;
+       }
+
+       data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
+       data->dptr = talloc_memdup(mem_ctx,
+                                  rec.dptr + ctdb_ltdb_header_len(header),
+                                  data->dsize);
+
+       free(rec.dptr);
+
+       if (data->dptr == NULL) {
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static int database_seqnum(struct database *db, uint64_t *seqnum)
+{
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header header;
+       size_t np;
+       int ret;
+
+       if (db->tdb == NULL) {
+               *seqnum = db->seq_num;
+               return 0;
+       }
+
+       key.dptr = discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+
+       ret = ltdb_fetch(db, key, &header, db, &data);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (data.dsize == 0) {
+               *seqnum = 0;
+               return 0;
+       }
+
+       ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
+       talloc_free(data.dptr);
+       if (ret != 0) {
+               *seqnum = 0;
+       }
+
+       return ret;
+}
+
+static int ltdb_transaction_update(uint32_t reqid,
+                                  struct ctdb_ltdb_header *no_header,
+                                  TDB_DATA key, TDB_DATA data,
+                                  void *private_data)
+{
+       struct database *db = (struct database *)private_data;
+       TALLOC_CTX *tmp_ctx = talloc_new(db);
+       struct ctdb_ltdb_header header = { 0 }, oldheader;
+       TDB_DATA olddata;
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = ctdb_ltdb_header_extract(&data, &header);
+       if (ret != 0) {
+               return ret;
+       }
+
+       ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (olddata.dsize > 0) {
+               if (oldheader.rsn > header.rsn ||
+                   (oldheader.rsn == header.rsn &&
+                    olddata.dsize != data.dsize)) {
+                       return -1;
+               }
+       }
+
+       talloc_free(tmp_ctx);
+
+       ret = ltdb_store(db, key, &header, data);
+       return ret;
+}
+
+static int ltdb_transaction(struct database *db,
+                           struct ctdb_rec_buffer *recbuf)
+{
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = tdb_transaction_start(db->tdb);
+       if (ret == -1) {
+               return ret;
+       }
+
+       ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
+       if (ret != 0) {
+               tdb_transaction_cancel(db->tdb);
+       }
+
+       ret = tdb_transaction_commit(db->tdb);
+       return ret;
+}
+
+static bool public_ips_parse(struct ctdbd_context *ctdb,
+                            uint32_t numnodes)
+{
+       bool status;
+
+       if (numnodes == 0) {
+               D_ERR("Must initialise nodemap before public IPs\n");
+               return false;
+       }
+
+       ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
+
+       status = (ctdb->known_ips != NULL);
+
+       if (status) {
+               D_INFO("Parsing public IPs done\n");
+       } else {
+               D_INFO("Parsing public IPs failed\n");
+       }
+
+       return status;
+}
+
+/* Read information about controls to fail.  Format is:
+ *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
+ */
+static bool control_failures_parse(struct ctdbd_context *ctdb)
+{
+       char line[1024];
+
+       while ((fgets(line, sizeof(line), stdin) != NULL)) {
+               char *tok, *t;
+               enum ctdb_controls opcode;
+               uint32_t pnn;
+               const char *error;
+               const char *comment;
+               struct fake_control_failure *failure = NULL;
+
+               if (line[0] == '\n') {
+                       break;
+               }
+
+               /* Get rid of pesky newline */
+               if ((t = strchr(line, '\n')) != NULL) {
+                       *t = '\0';
+               }
+
+               /* Get opcode */
+               tok = strtok(line, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing opcode\n", line);
+                       continue;
+               }
+               opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
+
+               /* Get PNN */
+               tok = strtok(NULL, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing PNN\n", line);
+                       continue;
+               }
+               pnn = (uint32_t)strtoul(tok, NULL, 0);
+
+               /* Get error */
+               tok = strtok(NULL, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing errno\n", line);
+                       continue;
+               }
+               error = talloc_strdup(ctdb, tok);
+               if (error == NULL) {
+                       goto fail;
+               }
+               if (strcmp(error, "ERROR") != 0 &&
+                   strcmp(error, "TIMEOUT") != 0) {
+                       D_ERR("bad line (%s) "
+                             "- error must be \"ERROR\" or \"TIMEOUT\"\n",
+                             line);
+                       goto fail;
+               }
+
+               /* Get comment */
+               tok = strtok(NULL, "\n"); /* rest of line */
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing comment\n", line);
+                       continue;
+               }
+               comment = talloc_strdup(ctdb, tok);
+               if (comment == NULL) {
+                       goto fail;
+               }
+
+               failure = talloc_zero(ctdb, struct fake_control_failure);
+               if (failure == NULL) {
+                       goto fail;
+               }
+
+               failure->opcode = opcode;
+               failure->pnn = pnn;
+               failure->error = error;
+               failure->comment = comment;
+
+               DLIST_ADD(ctdb->control_failures, failure);
+       }
+
+       D_INFO("Parsing fake control failures done\n");
+       return true;
+
+fail:
+       D_INFO("Parsing fake control failures failed\n");
+       return false;
+}
+
+/*
+ * Manage clients
+ */
+
+static int ctdb_client_destructor(struct ctdb_client *client)
+{
+       DLIST_REMOVE(client->ctdb->client_list, client);
+       return 0;
+}
+
+static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
+                     void *client_state)
+{
+       struct ctdb_client *client;
+
+       client = talloc_zero(client_state, struct ctdb_client);
+       if (client == NULL) {
+               return ENOMEM;
+       }
+
+       client->ctdb = ctdb;
+       client->pid = client_pid;
+       client->state = client_state;
+
+       DLIST_ADD(ctdb->client_list, client);
+       talloc_set_destructor(client, ctdb_client_destructor);
+       return 0;
+}
+
+static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
+{
+       struct ctdb_client *client;
+
+       for (client=ctdb->client_list; client != NULL; client=client->next) {
+               if (client->pid == client_pid) {
+                       return client->state;
+               }
+       }
+
+       return NULL;
+}
+
+/*
+ * CTDB context setup
+ */
+
+static uint32_t new_generation(uint32_t old_generation)
+{
+       uint32_t generation;
+
+       while (1) {
+               generation = random();
+               if (generation != INVALID_GENERATION &&
+                   generation != old_generation) {
+                       break;
+               }
+       }
+
+       return generation;
+}
+
+static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
+                                        const char *dbdir)
+{
+       struct ctdbd_context *ctdb;
+       char line[1024];
+       bool status;
+       int ret;
+
+       ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
+       if (ctdb == NULL) {
+               return NULL;
+       }
+
+       ctdb->node_map = nodemap_init(ctdb);
+       if (ctdb->node_map == NULL) {
+               goto fail;
+       }
+
+       ctdb->iface_map = interfaces_init(ctdb);
+       if (ctdb->iface_map == NULL) {
+               goto fail;
+       }
+
+       ctdb->vnn_map = vnnmap_init(ctdb);
+       if (ctdb->vnn_map == NULL) {
+               goto fail;
+       }
+
+       ctdb->db_map = dbmap_init(ctdb, dbdir);
+       if (ctdb->db_map == NULL) {
+               goto fail;
+       }
+
+       ret = srvid_init(ctdb, &ctdb->srv);
+       if (ret != 0) {
+               goto fail;
+       }
+
+       while (fgets(line, sizeof(line), stdin) != NULL) {
+               char *t;
+
+               if ((t = strchr(line, '\n')) != NULL) {
+                       *t = '\0';
+               }
+
+               if (strcmp(line, "NODEMAP") == 0) {
+                       status = nodemap_parse(ctdb->node_map);
+               } else if (strcmp(line, "IFACES") == 0) {
+                       status = interfaces_parse(ctdb->iface_map);
+               } else if (strcmp(line, "VNNMAP") == 0) {
+                       status = vnnmap_parse(ctdb->vnn_map);
+               } else if (strcmp(line, "DBMAP") == 0) {
+                       status = dbmap_parse(ctdb->db_map);
+               } else if (strcmp(line, "PUBLICIPS") == 0) {
+                       status = public_ips_parse(ctdb,
+                                                 ctdb->node_map->num_nodes);
+               } else if (strcmp(line, "RECLOCK") == 0) {
+                       status = reclock_parse(ctdb);
+               } else if (strcmp(line, "CONTROLFAILS") == 0) {
+                       status = control_failures_parse(ctdb);
+               } else {
+                       fprintf(stderr, "Unknown line %s\n", line);
+                       status = false;
+               }
+
+               if (! status) {
+                       goto fail;
+               }
+       }
+
+       ctdb->start_time = tevent_timeval_current();
+       ctdb->recovery_start_time = tevent_timeval_current();
+       ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
+       if (ctdb->vnn_map->generation == INVALID_GENERATION) {
+               ctdb->vnn_map->generation =
+                       new_generation(ctdb->vnn_map->generation);
+       }
+       ctdb->recovery_end_time = tevent_timeval_current();
+
+       ctdb->log_level = DEBUG_ERR;
+       ctdb->runstate = CTDB_RUNSTATE_RUNNING;
+
+       ctdb_tunable_set_defaults(&ctdb->tun_list);
+
+       return ctdb;
+
+fail:
+       TALLOC_FREE(ctdb);
+       return NULL;
+}
+
+static bool ctdbd_verify(struct ctdbd_context *ctdb)
+{
+       struct node *node;
+       unsigned int i;
+
+       if (ctdb->node_map->num_nodes == 0) {
+               return true;
+       }
+
+       /* Make sure all the nodes are in order */
+       for (i=0; i<ctdb->node_map->num_nodes; i++) {
+               node = &ctdb->node_map->node[i];
+               if (node->pnn != i) {
+                       fprintf(stderr, "Expected node %u, found %u\n",
+                               i, node->pnn);
+                       return false;
+               }
+       }
+
+       node = &ctdb->node_map->node[ctdb->node_map->pnn];
+       if (node->flags & NODE_FLAGS_DISCONNECTED) {
+               DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
+               exit(0);
+       }
+
+       return true;
+}
+
+/*
+ * Doing a recovery
+ */
+
+struct recover_state {
+       struct tevent_context *ev;
+       struct ctdbd_context *ctdb;
+};
+
+static int recover_check(struct tevent_req *req);
+static void recover_wait_done(struct tevent_req *subreq);
+static void recover_done(struct tevent_req *subreq);
+
+static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct ctdbd_context *ctdb)
+{
+       struct tevent_req *req;
+       struct recover_state *state;
+       int ret;
+
+       req = tevent_req_create(mem_ctx, &state, struct recover_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->ctdb = ctdb;
+
+       ret = recover_check(req);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return tevent_req_post(req, ev);
+       }
+
+       return req;
+}
+
+static int recover_check(struct tevent_req *req)
+{
+       struct recover_state *state = tevent_req_data(
+               req, struct recover_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct tevent_req *subreq;
+       bool recovery_disabled;
+       unsigned int i;
+
+       recovery_disabled = false;
+       for (i=0; i<ctdb->node_map->num_nodes; i++) {
+               if (ctdb->node_map->node[i].recovery_disabled) {
+                       recovery_disabled = true;
+                       break;
+               }
+       }
+
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   tevent_timeval_current_ofs(1, 0));
+       if (subreq == NULL) {
+               return ENOMEM;
+       }
+
+       if (recovery_disabled) {
+               tevent_req_set_callback(subreq, recover_wait_done, req);
+       } else {
+               ctdb->recovery_start_time = tevent_timeval_current();
+               tevent_req_set_callback(subreq, recover_done, req);
+       }
+
+       return 0;
+}
+
+static void recover_wait_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret;
+       bool status;
+
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       ret = recover_check(req);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+       }
+}
+
+static void recover_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct recover_state *state = tevent_req_data(
+               req, struct recover_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       bool status;
+
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
+       ctdb->recovery_end_time = tevent_timeval_current();
+       ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
+
+       tevent_req_done(req);
+}
+
+static bool recover_recv(struct tevent_req *req, int *perr)
+{
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               if (perr != NULL) {
+                       *perr = err;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+/*
+ * Routines for ctdb_req_header
+ */
+
+static void header_fix_pnn(struct ctdb_req_header *header,
+                          struct ctdbd_context *ctdb)
+{
+       if (header->srcnode == CTDB_CURRENT_NODE) {
+               header->srcnode = ctdb->node_map->pnn;
+       }
+
+       if (header->destnode == CTDB_CURRENT_NODE) {
+               header->destnode = ctdb->node_map->pnn;
+       }
+}
+
+static struct ctdb_req_header header_reply_call(
+                                       struct ctdb_req_header *header,
+                                       struct ctdbd_context *ctdb)
+{
+       struct ctdb_req_header reply_header;
+
+       reply_header = (struct ctdb_req_header) {
+               .ctdb_magic = CTDB_MAGIC,
+               .ctdb_version = CTDB_PROTOCOL,
+               .generation = ctdb->vnn_map->generation,
+               .operation = CTDB_REPLY_CALL,
+               .destnode = header->srcnode,
+               .srcnode = header->destnode,
+               .reqid = header->reqid,
+       };
+
+       return reply_header;
+}
+
+static struct ctdb_req_header header_reply_control(
+                                       struct ctdb_req_header *header,
+                                       struct ctdbd_context *ctdb)
+{
+       struct ctdb_req_header reply_header;
+
+       reply_header = (struct ctdb_req_header) {
+               .ctdb_magic = CTDB_MAGIC,
+               .ctdb_version = CTDB_PROTOCOL,
+               .generation = ctdb->vnn_map->generation,
+               .operation = CTDB_REPLY_CONTROL,
+               .destnode = header->srcnode,
+               .srcnode = header->destnode,
+               .reqid = header->reqid,
+       };
+
+       return reply_header;
+}
+
+static struct ctdb_req_header header_reply_message(
+                                       struct ctdb_req_header *header,
+                                       struct ctdbd_context *ctdb)
+{
+       struct ctdb_req_header reply_header;
+
+       reply_header = (struct ctdb_req_header) {
+               .ctdb_magic = CTDB_MAGIC,
+               .ctdb_version = CTDB_PROTOCOL,
+               .generation = ctdb->vnn_map->generation,
+               .operation = CTDB_REQ_MESSAGE,
+               .destnode = header->srcnode,
+               .srcnode = header->destnode,
+               .reqid = 0,
+       };
+
+       return reply_header;
+}
+
+/*
+ * Client state
+ */
+
+struct client_state {
+       struct tevent_context *ev;
+       int fd;
+       struct ctdbd_context *ctdb;
+       int pnn;
+       pid_t pid;
+       struct comm_context *comm;
+       struct srvid_register_state *rstate;
+       int status;
+};
+
+/*
+ * Send replies to call, controls and messages
+ */
+
+static void client_reply_done(struct tevent_req *subreq);
+
+static void client_send_call(struct tevent_req *req,
+                            struct ctdb_req_header *header,
+                            struct ctdb_reply_call *reply)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct tevent_req *subreq;
+       struct ctdb_req_header reply_header;
+       uint8_t *buf;
+       size_t datalen, buflen;
+       int ret;
+
+       reply_header = header_reply_call(header, ctdb);
+
+       datalen = ctdb_reply_call_len(&reply_header, reply);
+       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, client_reply_done, req);
+
+       talloc_steal(subreq, buf);
+}
+
+static void client_send_message(struct tevent_req *req,
+                               struct ctdb_req_header *header,
+                               struct ctdb_req_message_data *message)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct tevent_req *subreq;
+       struct ctdb_req_header reply_header;
+       uint8_t *buf;
+       size_t datalen, buflen;
+       int ret;
+
+       reply_header = header_reply_message(header, ctdb);
+
+       datalen = ctdb_req_message_data_len(&reply_header, message);
+       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_req_message_data_push(&reply_header, message,
+                                        buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
+
+       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, client_reply_done, req);
+
+       talloc_steal(subreq, buf);
+}
+
+static void client_send_control(struct tevent_req *req,
+                               struct ctdb_req_header *header,
+                               struct ctdb_reply_control *reply)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct tevent_req *subreq;
+       struct ctdb_req_header reply_header;
+       uint8_t *buf;
+       size_t datalen, buflen;
+       int ret;
+
+       reply_header = header_reply_control(header, ctdb);
+
+       datalen = ctdb_reply_control_len(&reply_header, reply);
+       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
+
+       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, client_reply_done, req);
+
+       talloc_steal(subreq, buf);
+}
+
+static void client_reply_done(struct tevent_req *subreq)
+{
        struct tevent_req *req = tevent_req_callback_data(
                subreq, struct tevent_req);
-       struct recover_state *state = tevent_req_data(
-               req, struct recover_state);
-       struct ctdbd_context *ctdb = state->ctdb;
+       int ret;
        bool status;
 
-       status = tevent_wakeup_recv(subreq);
+       status = comm_write_recv(subreq, &ret);
        TALLOC_FREE(subreq);
        if (! status) {
-               tevent_req_error(req, EIO);
-               return;
+               tevent_req_error(req, ret);
        }
+}
 
-       ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
-       ctdb->recovery_end_time = tevent_timeval_current();
-       ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
+/*
+ * Handling protocol - controls
+ */
 
-       tevent_req_done(req);
+static void control_process_exists(TALLOC_CTX *mem_ctx,
+                                  struct tevent_req *req,
+                                  struct ctdb_req_header *header,
+                                  struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct client_state *cstate;
+       struct ctdb_reply_control reply;
+
+       reply.rdata.opcode = request->opcode;
+
+       cstate = client_find(ctdb, request->rdata.data.pid);
+       if (cstate == NULL) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID";
+       } else {
+               reply.status = kill(request->rdata.data.pid, 0);
+               reply.errmsg = NULL;
+       }
+
+       client_send_control(req, header, &reply);
 }
 
-static bool recover_recv(struct tevent_req *req, int *perr)
+static void control_ping(TALLOC_CTX *mem_ctx,
+                        struct tevent_req *req,
+                        struct ctdb_req_header *header,
+                        struct ctdb_req_control *request)
 {
-       int err;
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
 
-       if (tevent_req_is_unix_error(req, &err)) {
-               if (perr != NULL) {
-                       *perr = err;
+       reply.rdata.opcode = request->opcode;
+       reply.status = ctdb->num_clients;
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
+}
+
+static void control_getdbpath(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+       } else {
+               reply.rdata.data.db_path =
+                       talloc_strdup(mem_ctx, db->path);
+               if (reply.rdata.data.db_path == NULL) {
+                       reply.status = ENOMEM;
+                       reply.errmsg = "Memory error";
+               } else {
+                       reply.status = 0;
+                       reply.errmsg = NULL;
                }
-               return false;
        }
 
-       return true;
+       client_send_control(req, header, &reply);
 }
 
-/*
- * Routines for ctdb_req_header
- */
-
-static void header_fix_pnn(struct ctdb_req_header *header,
-                          struct ctdbd_context *ctdb)
+static void control_getvnnmap(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
-       if (header->srcnode == CTDB_CURRENT_NODE) {
-               header->srcnode = ctdb->node_map->pnn;
-       }
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct ctdb_vnn_map *vnnmap;
 
-       if (header->destnode == CTDB_CURRENT_NODE) {
-               header->destnode = ctdb->node_map->pnn;
+       reply.rdata.opcode = request->opcode;
+
+       vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
+       if (vnnmap == NULL) {
+               reply.status = ENOMEM;
+               reply.errmsg = "Memory error";
+       } else {
+               vnnmap->generation = ctdb->vnn_map->generation;
+               vnnmap->size = ctdb->vnn_map->size;
+               vnnmap->map = ctdb->vnn_map->map;
+
+               reply.rdata.data.vnnmap = vnnmap;
+               reply.status = 0;
+               reply.errmsg = NULL;
        }
+
+       client_send_control(req, header, &reply);
 }
 
-static struct ctdb_req_header header_reply_control(
-                                       struct ctdb_req_header *header,
-                                       struct ctdbd_context *ctdb)
+static void control_get_debug(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
-       struct ctdb_req_header reply_header;
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
 
-       reply_header = (struct ctdb_req_header) {
-               .ctdb_magic = CTDB_MAGIC,
-               .ctdb_version = CTDB_PROTOCOL,
-               .generation = ctdb->vnn_map->generation,
-               .operation = CTDB_REPLY_CONTROL,
-               .destnode = header->srcnode,
-               .srcnode = header->destnode,
-               .reqid = header->reqid,
-       };
+       reply.rdata.opcode = request->opcode;
+       reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
+       reply.status = 0;
+       reply.errmsg = NULL;
 
-       return reply_header;
+       client_send_control(req, header, &reply);
 }
 
-static struct ctdb_req_header header_reply_message(
-                                       struct ctdb_req_header *header,
-                                       struct ctdbd_context *ctdb)
+static void control_set_debug(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
-       struct ctdb_req_header reply_header;
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
 
-       reply_header = (struct ctdb_req_header) {
-               .ctdb_magic = CTDB_MAGIC,
-               .ctdb_version = CTDB_PROTOCOL,
-               .generation = ctdb->vnn_map->generation,
-               .operation = CTDB_REQ_MESSAGE,
-               .destnode = header->srcnode,
-               .srcnode = header->destnode,
-               .reqid = 0,
-       };
+       ctdb->log_level = (int)request->rdata.data.loglevel;
 
-       return reply_header;
+       reply.rdata.opcode = request->opcode;
+       reply.status = 0;
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
 }
 
-/*
- * Client state
- */
+static void control_get_dbmap(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                              struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct ctdb_dbid_map *dbmap;
+       struct database *db;
+       unsigned int i;
 
-struct client_state {
-       struct tevent_context *ev;
-       int fd;
+       reply.rdata.opcode = request->opcode;
+
+       dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
+       if (dbmap == NULL) {
+               goto fail;
+       }
+
+       dbmap->num = database_count(ctdb->db_map);
+       dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
+       if (dbmap->dbs == NULL) {
+               goto fail;
+       }
+
+       db = ctdb->db_map->db;
+       for (i = 0; i < dbmap->num; i++) {
+               dbmap->dbs[i] = (struct ctdb_dbid) {
+                       .db_id = db->id,
+                       .flags = db->flags,
+               };
+
+               db = db->next;
+       }
+
+       reply.rdata.data.dbmap = dbmap;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+       return;
+
+fail:
+       reply.status = -1;
+       reply.errmsg = "Memory error";
+       client_send_control(req, header, &reply);
+}
+
+static void control_get_recmode(TALLOC_CTX *mem_ctx,
+                               struct tevent_req *req,
+                               struct ctdb_req_header *header,
+                               struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+
+       reply.rdata.opcode = request->opcode;
+       reply.status = ctdb->vnn_map->recmode;
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
+}
+
+struct set_recmode_state {
+       struct tevent_req *req;
        struct ctdbd_context *ctdb;
-       int pnn;
-       struct comm_context *comm;
-       struct srvid_register_state *rstate;
-       int status;
+       struct ctdb_req_header header;
+       struct ctdb_reply_control reply;
 };
 
-/*
- * Send replies to controls and messages
- */
+static void set_recmode_callback(struct tevent_req *subreq)
+{
+       struct set_recmode_state *substate = tevent_req_callback_data(
+               subreq, struct set_recmode_state);
+       bool status;
+       int ret;
 
-static void client_reply_done(struct tevent_req *subreq);
+       status = recover_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               substate->reply.status = ret;
+               substate->reply.errmsg = "recovery failed";
+       } else {
+               substate->reply.status = 0;
+               substate->reply.errmsg = NULL;
+       }
 
-static void client_send_message(struct tevent_req *req,
+       client_send_control(substate->req, &substate->header, &substate->reply);
+       talloc_free(substate);
+}
+
+static void control_set_recmode(TALLOC_CTX *mem_ctx,
+                               struct tevent_req *req,
                                struct ctdb_req_header *header,
-                               struct ctdb_req_message_data *message)
+                               struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
-       struct ctdbd_context *ctdb = state->ctdb;
        struct tevent_req *subreq;
-       struct ctdb_req_header reply_header;
-       uint8_t *buf;
-       size_t datalen, buflen;
-       int ret;
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct set_recmode_state *substate;
+       struct ctdb_reply_control reply;
 
-       reply_header = header_reply_message(header, ctdb);
+       reply.rdata.opcode = request->opcode;
 
-       datalen = ctdb_req_message_data_len(&reply_header, message);
-       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return;
+       if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
+               reply.status = -1;
+               reply.errmsg = "Client cannot set recmode to NORMAL";
+               goto fail;
        }
 
-       ret = ctdb_req_message_data_push(&reply_header, message,
-                                        buf, &buflen);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return;
+       substate = talloc_zero(ctdb, struct set_recmode_state);
+       if (substate == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Memory error";
+               goto fail;
        }
 
-       DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
+       substate->req = req;
+       substate->ctdb = ctdb;
+       substate->header = *header;
+       substate->reply.rdata.opcode = request->opcode;
 
-       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
-       if (tevent_req_nomem(subreq, req)) {
-               return;
+       subreq = recover_send(substate, state->ev, state->ctdb);
+       if (subreq == NULL) {
+               talloc_free(substate);
+               goto fail;
        }
-       tevent_req_set_callback(subreq, client_reply_done, req);
+       tevent_req_set_callback(subreq, set_recmode_callback, substate);
+
+       ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
+       return;
+
+fail:
+       client_send_control(req, header, &reply);
 
-       talloc_steal(subreq, buf);
 }
 
-static void client_send_control(struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_reply_control *reply)
+static void control_db_attach(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
-       struct tevent_req *subreq;
-       struct ctdb_req_header reply_header;
-       uint8_t *buf;
-       size_t datalen, buflen;
-       int ret;
-
-       reply_header = header_reply_control(header, ctdb);
+       struct ctdb_reply_control reply;
+       struct database *db;
 
-       datalen = ctdb_reply_control_len(&reply_header, reply);
-       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return;
-       }
+       reply.rdata.opcode = request->opcode;
 
-       ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return;
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
+               }
        }
 
-       DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
-
-       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
-       if (tevent_req_nomem(subreq, req)) {
+       db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
                return;
        }
-       tevent_req_set_callback(subreq, client_reply_done, req);
 
-       talloc_steal(subreq, buf);
+done:
+       reply.rdata.data.db_id = db->id;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
 }
 
-static void client_reply_done(struct tevent_req *subreq)
+static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
 {
-       struct tevent_req *req = tevent_req_callback_data(
-               subreq, struct tevent_req);
-       int ret;
-       bool status;
-
-       status = comm_write_recv(subreq, &ret);
-       TALLOC_FREE(subreq);
-       if (! status) {
-               tevent_req_error(req, ret);
-       }
+       printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
 }
 
-/*
- * Handling protocol - controls
- */
-
-static void control_process_exists(TALLOC_CTX *mem_ctx,
+static void control_register_srvid(TALLOC_CTX *mem_ctx,
                                   struct tevent_req *req,
                                   struct ctdb_req_header *header,
                                   struct ctdb_req_control *request)
 {
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = kill(request->rdata.data.pid, 0);
+
+       ret = srvid_register(ctdb->srv, state, request->srvid,
+                            srvid_handler, state);
+       if (ret != 0) {
+               reply.status = -1;
+               reply.errmsg = "Memory error";
+               goto fail;
+       }
+
+       DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
+
+       reply.status = 0;
        reply.errmsg = NULL;
 
+fail:
        client_send_control(req, header, &reply);
 }
 
-static void control_ping(TALLOC_CTX *mem_ctx,
-                        struct tevent_req *req,
-                        struct ctdb_req_header *header,
-                        struct ctdb_req_control *request)
+static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
+                                    struct tevent_req *req,
+                                    struct ctdb_req_header *header,
+                                    struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = ctdb->num_clients;
+
+       ret = srvid_deregister(ctdb->srv, request->srvid, state);
+       if (ret != 0) {
+               reply.status = -1;
+               reply.errmsg = "srvid not registered";
+               goto fail;
+       }
+
+       DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
+
+       reply.status = 0;
        reply.errmsg = NULL;
 
+fail:
        client_send_control(req, header, &reply);
 }
 
-static void control_getvnnmap(TALLOC_CTX *mem_ctx,
-                             struct tevent_req *req,
-                             struct ctdb_req_header *header,
-                             struct ctdb_req_control *request)
+static void control_get_dbname(TALLOC_CTX *mem_ctx,
+                              struct tevent_req *req,
+                              struct ctdb_req_header *header,
+                              struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct ctdb_vnn_map *vnnmap;
+       struct database *db;
 
        reply.rdata.opcode = request->opcode;
 
-       vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
-       if (vnnmap == NULL) {
-               reply.status = ENOMEM;
-               reply.errmsg = "Memory error";
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
        } else {
-               vnnmap->generation = ctdb->vnn_map->generation;
-               vnnmap->size = ctdb->vnn_map->size;
-               vnnmap->map = ctdb->vnn_map->map;
-
-               reply.rdata.data.vnnmap = vnnmap;
-               reply.status = 0;
-               reply.errmsg = NULL;
+               reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
+               if (reply.rdata.data.db_name == NULL) {
+                       reply.status = ENOMEM;
+                       reply.errmsg = "Memory error";
+               } else {
+                       reply.status = 0;
+                       reply.errmsg = NULL;
+               }
        }
 
        client_send_control(req, header, &reply);
 }
 
-static void control_get_debug(TALLOC_CTX *mem_ctx,
-                             struct tevent_req *req,
-                             struct ctdb_req_header *header,
-                             struct ctdb_req_control *request)
+static void control_get_pid(TALLOC_CTX *mem_ctx,
+                           struct tevent_req *req,
+                           struct ctdb_req_header *header,
+                           struct ctdb_req_control *request)
+{
+       struct ctdb_reply_control reply;
+
+       reply.rdata.opcode = request->opcode;
+       reply.status = getpid();
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
+}
+
+static void control_get_recmaster(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
@@ -1020,33 +2118,67 @@ static void control_get_debug(TALLOC_CTX *mem_ctx,
        struct ctdb_reply_control reply;
 
        reply.rdata.opcode = request->opcode;
-       reply.rdata.data.loglevel = debug_level_to_int(ctdb->log_level);
-       reply.status = 0;
+       reply.status = ctdb->node_map->recmaster;
        reply.errmsg = NULL;
 
        client_send_control(req, header, &reply);
 }
 
-static void control_set_debug(TALLOC_CTX *mem_ctx,
-                             struct tevent_req *req,
-                             struct ctdb_req_header *header,
-                             struct ctdb_req_control *request)
+static void control_get_pnn(TALLOC_CTX *mem_ctx,
+                           struct tevent_req *req,
+                           struct ctdb_req_header *header,
+                           struct ctdb_req_control *request)
+{
+       struct ctdb_reply_control reply;
+
+       reply.rdata.opcode = request->opcode;
+       reply.status = header->destnode;
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
+}
+
+static void control_shutdown(TALLOC_CTX *mem_ctx,
+                            struct tevent_req *req,
+                            struct ctdb_req_header *hdr,
+                            struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+
+       state->status = 99;
+}
+
+static void control_set_tunable(TALLOC_CTX *mem_ctx,
+                               struct tevent_req *req,
+                               struct ctdb_req_header *header,
+                               struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-
-       ctdb->log_level = debug_level_from_int(request->rdata.data.loglevel);
+       bool ret, obsolete;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = 0;
        reply.errmsg = NULL;
 
+       ret = ctdb_tunable_set_value(&ctdb->tun_list,
+                                    request->rdata.data.tunable->name,
+                                    request->rdata.data.tunable->value,
+                                    &obsolete);
+       if (! ret) {
+               reply.status = -1;
+       } else if (obsolete) {
+               reply.status = 1;
+       } else {
+               reply.status = 0;
+       }
+
        client_send_control(req, header, &reply);
 }
 
-static void control_get_recmode(TALLOC_CTX *mem_ctx,
+static void control_get_tunable(TALLOC_CTX *mem_ctx,
                                struct tevent_req *req,
                                struct ctdb_req_header *header,
                                struct ctdb_req_control *request)
@@ -1055,272 +2187,369 @@ static void control_get_recmode(TALLOC_CTX *mem_ctx,
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       uint32_t value;
+       bool ret;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = ctdb->vnn_map->recmode;
        reply.errmsg = NULL;
 
+       ret = ctdb_tunable_get_value(&ctdb->tun_list,
+                                    request->rdata.data.tun_var, &value);
+       if (! ret) {
+               reply.status = -1;
+       } else {
+               reply.rdata.data.tun_value = value;
+               reply.status = 0;
+       }
+
        client_send_control(req, header, &reply);
 }
 
-struct set_recmode_state {
-       struct tevent_req *req;
-       struct ctdbd_context *ctdb;
-       struct ctdb_req_header header;
+static void control_list_tunables(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
        struct ctdb_reply_control reply;
-};
+       struct ctdb_var_list *var_list;
 
-static void set_recmode_callback(struct tevent_req *subreq)
-{
-       struct set_recmode_state *substate = tevent_req_callback_data(
-               subreq, struct set_recmode_state);
-       bool status;
-       int ret;
+       reply.rdata.opcode = request->opcode;
+       reply.errmsg = NULL;
 
-       status = recover_recv(subreq, &ret);
-       TALLOC_FREE(subreq);
-       if (! status) {
-               substate->reply.status = ret;
-               substate->reply.errmsg = "recovery failed";
+       var_list = ctdb_tunable_names(mem_ctx);
+       if (var_list == NULL) {
+               reply.status = -1;
        } else {
-               substate->reply.status = 0;
-               substate->reply.errmsg = NULL;
+               reply.rdata.data.tun_var_list = var_list;
+               reply.status = 0;
        }
 
-       client_send_control(substate->req, &substate->header, &substate->reply);
-       talloc_free(substate);
+       client_send_control(req, header, &reply);
 }
 
-static void control_set_recmode(TALLOC_CTX *mem_ctx,
-                               struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_req_control *request)
+static void control_modify_flags(TALLOC_CTX *mem_ctx,
+                                struct tevent_req *req,
+                                struct ctdb_req_header *header,
+                                struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
-       struct tevent_req *subreq;
        struct ctdbd_context *ctdb = state->ctdb;
-       struct set_recmode_state *substate;
+       struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
        struct ctdb_reply_control reply;
+       struct node *node;
 
        reply.rdata.opcode = request->opcode;
 
-       if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
-               reply.status = -1;
-               reply.errmsg = "Client cannot set recmode to NORMAL";
-               goto fail;
+       if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
+           (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
+               DEBUG(DEBUG_INFO,
+                     ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
+               reply.status = EINVAL;
+               reply.errmsg = "Failed to MODIFY_FLAGS";
+               client_send_control(req, header, &reply);
+               return;
        }
 
-       substate = talloc_zero(ctdb, struct set_recmode_state);
-       if (substate == NULL) {
-               reply.status = -1;
-               reply.errmsg = "Memory error";
-               goto fail;
-       }
+       /* There's all sorts of broadcast weirdness here.  Only change
+        * the specified node, not the destination node of the
+        * control. */
+       node = &ctdb->node_map->node[change->pnn];
 
-       substate->req = req;
-       substate->ctdb = ctdb;
-       substate->header = *header;
-       substate->reply.rdata.opcode = request->opcode;
+       if ((node->flags &
+            change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
+           (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
+               DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
+               node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
+               goto done;
+       }
 
-       subreq = recover_send(substate, state->ev, state->ctdb);
-       if (subreq == NULL) {
-               talloc_free(substate);
-               goto fail;
+       if ((node->flags &
+            change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
+           (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
+               DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
+               node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
+               goto done;
        }
-       tevent_req_set_callback(subreq, set_recmode_callback, substate);
 
-       ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
-       return;
+       DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
 
-fail:
+done:
+       reply.status = 0;
+       reply.errmsg = NULL;
        client_send_control(req, header, &reply);
-
-}
-
-static int srvid_register_state_destructor(struct srvid_register_state *rstate)
-{
-       DLIST_REMOVE(rstate->ctdb->rstate, rstate);
-       return 0;
 }
 
-static void control_register_srvid(TALLOC_CTX *mem_ctx,
-                                  struct tevent_req *req,
-                                  struct ctdb_req_header *header,
-                                  struct ctdb_req_control *request)
+static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
+                                    struct tevent_req *req,
+                                    struct ctdb_req_header *header,
+                                    struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct srvid_register_state *rstate;
 
        reply.rdata.opcode = request->opcode;
-
-       rstate = talloc_zero(ctdb, struct srvid_register_state);
-       if (rstate == NULL) {
-               reply.status = -1;
-               reply.errmsg = "Memory error";
-               goto fail;
-       }
-       rstate->ctdb = ctdb;
-       rstate->srvid = request->srvid;
-
-       talloc_set_destructor(rstate, srvid_register_state_destructor);
-
-       DLIST_ADD_END(ctdb->rstate, rstate);
-
-       DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
-
+       reply.rdata.data.tun_list = &ctdb->tun_list;
        reply.status = 0;
        reply.errmsg = NULL;
 
-fail:
        client_send_control(req, header, &reply);
 }
 
-static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
-                                    struct tevent_req *req,
-                                    struct ctdb_req_header *header,
-                                    struct ctdb_req_control *request)
+static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
+                                        struct tevent_req *req,
+                                        struct ctdb_req_header *header,
+                                        struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct srvid_register_state *rstate = NULL;
+       struct database *db;
 
        reply.rdata.opcode = request->opcode;
 
-       for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
-               if (rstate->srvid == request->srvid) {
-                       break;
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
                }
        }
 
-       if (rstate == NULL) {
+       db = database_new(ctdb->db_map, request->rdata.data.db_name,
+                         CTDB_DB_FLAGS_PERSISTENT);
+       if (db == NULL) {
                reply.status = -1;
-               reply.errmsg = "srvid not registered";
-               goto fail;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
+               return;
        }
 
-       DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
-       talloc_free(rstate);
-
+done:
+       reply.rdata.data.db_id = db->id;
        reply.status = 0;
        reply.errmsg = NULL;
-
-       client_send_control(req, header, &reply);
-       return;
-
-fail:
-       TALLOC_FREE(rstate);
        client_send_control(req, header, &reply);
 }
 
-static void control_get_pid(TALLOC_CTX *mem_ctx,
-                           struct tevent_req *req,
-                           struct ctdb_req_header *header,
-                           struct ctdb_req_control *request)
+static void control_uptime(TALLOC_CTX *mem_ctx,
+                          struct tevent_req *req,
+                          struct ctdb_req_header *header,
+                          struct ctdb_req_control *request)
 {
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       struct ctdb_uptime *uptime;;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = getpid();
+
+       uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
+       if (uptime == NULL) {
+               goto fail;
+       }
+
+       uptime->current_time = tevent_timeval_current();
+       uptime->ctdbd_start_time = ctdb->start_time;
+       uptime->last_recovery_started = ctdb->recovery_start_time;
+       uptime->last_recovery_finished = ctdb->recovery_end_time;
+
+       reply.rdata.data.uptime = uptime;
+       reply.status = 0;
        reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+       return;
 
+fail:
+       reply.status = -1;
+       reply.errmsg = "Memory error";
        client_send_control(req, header, &reply);
 }
 
-static void control_get_recmaster(TALLOC_CTX *mem_ctx,
-                                 struct tevent_req *req,
-                                 struct ctdb_req_header *header,
-                                 struct ctdb_req_control *request)
+static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
+                                     struct tevent_req *req,
+                                     struct ctdb_req_header *header,
+                                     struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       struct ctdb_node_map *nodemap;
+       struct node_map *node_map = ctdb->node_map;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = ctdb->node_map->recmaster;
-       reply.errmsg = NULL;
 
-       client_send_control(req, header, &reply);
-}
+       nodemap = read_nodes_file(mem_ctx, header->destnode);
+       if (nodemap == NULL) {
+               goto fail;
+       }
 
-static void control_get_pnn(TALLOC_CTX *mem_ctx,
-                           struct tevent_req *req,
-                           struct ctdb_req_header *header,
-                           struct ctdb_req_control *request)
-{
-       struct ctdb_reply_control reply;
+       for (i=0; i<nodemap->num; i++) {
+               struct node *node;
 
-       reply.rdata.opcode = request->opcode;
-       reply.status = header->destnode;
-       reply.errmsg = NULL;
+               if (i < node_map->num_nodes &&
+                   ctdb_sock_addr_same(&nodemap->node[i].addr,
+                                       &node_map->node[i].addr)) {
+                       continue;
+               }
 
-       client_send_control(req, header, &reply);
-}
+               if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
+                       int ret;
 
-static void control_shutdown(TALLOC_CTX *mem_ctx,
-                            struct tevent_req *req,
-                            struct ctdb_req_header *hdr,
-                            struct ctdb_req_control *request)
-{
-       struct client_state *state = tevent_req_data(
-               req, struct client_state);
+                       node = &node_map->node[i];
+
+                       node->flags |= NODE_FLAGS_DELETED;
+                       ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
+                                                        false);
+                       if (ret != 0) {
+                               /* Can't happen, but Coverity... */
+                               goto fail;
+                       }
+
+                       continue;
+               }
+
+               if (i < node_map->num_nodes &&
+                   node_map->node[i].flags & NODE_FLAGS_DELETED) {
+                       node = &node_map->node[i];
+
+                       node->flags &= ~NODE_FLAGS_DELETED;
+                       node->addr = nodemap->node[i].addr;
+
+                       continue;
+               }
+
+               node_map->node = talloc_realloc(node_map, node_map->node,
+                                               struct node,
+                                               node_map->num_nodes+1);
+               if (node_map->node == NULL) {
+                       goto fail;
+               }
+               node = &node_map->node[node_map->num_nodes];
+
+               node->addr = nodemap->node[i].addr;
+               node->pnn = nodemap->node[i].pnn;
+               node->flags = 0;
+               node->capabilities = CTDB_CAP_DEFAULT;
+               node->recovery_disabled = false;
+               node->recovery_substate = NULL;
+
+               node_map->num_nodes += 1;
+       }
+
+       talloc_free(nodemap);
+
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+       return;
 
-       state->status = 99;
+fail:
+       reply.status = -1;
+       reply.errmsg = "Memory error";
+       client_send_control(req, header, &reply);
 }
 
-static void control_get_monmode(TALLOC_CTX *mem_ctx,
-                               struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_req_control *request)
+static void control_get_capabilities(TALLOC_CTX *mem_ctx,
+                                    struct tevent_req *req,
+                                    struct ctdb_req_header *header,
+                                    struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       struct node *node;
+       uint32_t caps = 0;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = ctdb->monitoring_mode;
+
+       node = &ctdb->node_map->node[header->destnode];
+       caps = node->capabilities;
+
+       if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
+               /* Don't send reply */
+               return;
+       }
+
+       reply.rdata.data.caps = caps;
+       reply.status = 0;
        reply.errmsg = NULL;
 
        client_send_control(req, header, &reply);
 }
 
-static void control_set_tunable(TALLOC_CTX *mem_ctx,
-                               struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_req_control *request)
+static void control_release_ip(TALLOC_CTX *mem_ctx,
+                              struct tevent_req *req,
+                              struct ctdb_req_header *header,
+                              struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_public_ip *ip = request->rdata.data.pubip;
        struct ctdb_reply_control reply;
-       bool ret, obsolete;
+       struct ctdb_public_ip_list *ips = NULL;
+       struct ctdb_public_ip *t = NULL;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
-       reply.errmsg = NULL;
 
-       ret = ctdb_tunable_set_value(&ctdb->tun_list,
-                                    request->rdata.data.tunable->name,
-                                    request->rdata.data.tunable->value,
-                                    &obsolete);
-       if (! ret) {
-               reply.status = -1;
-       } else if (obsolete) {
-               reply.status = 1;
+       if (ctdb->known_ips == NULL) {
+               D_INFO("RELEASE_IP %s - not a public IP\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
+               goto done;
+       }
+
+       ips = &ctdb->known_ips[header->destnode];
+
+       t = NULL;
+       for (i = 0; i < ips->num; i++) {
+               if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
+                       t = &ips->ip[i];
+                       break;
+               }
+       }
+       if (t == NULL) {
+               D_INFO("RELEASE_IP %s - not a public IP\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
+               goto done;
+       }
+
+       if (t->pnn != header->destnode) {
+               if (header->destnode == ip->pnn) {
+                       D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
+                             ctdb_sock_addr_to_string(mem_ctx,
+                                                      &ip->addr, false),
+                             ip->pnn);
+                       reply.status = -1;
+                       reply.errmsg = "RELEASE_IP to TAKE_IP node";
+                       client_send_control(req, header, &reply);
+                       return;
+               }
+
+               D_INFO("RELEASE_IP %s - to node %d - redundant\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
+                      ip->pnn);
+               t->pnn = ip->pnn;
        } else {
-               reply.status = 0;
+               D_NOTICE("RELEASE_IP %s - to node %d\n",
+                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
+                         ip->pnn);
+               t->pnn = ip->pnn;
        }
 
+done:
+       reply.status = 0;
+       reply.errmsg = NULL;
        client_send_control(req, header, &reply);
 }
 
-static void control_get_tunable(TALLOC_CTX *mem_ctx,
+static void control_takeover_ip(TALLOC_CTX *mem_ctx,
                                struct tevent_req *req,
                                struct ctdb_req_header *header,
                                struct ctdb_req_control *request)
@@ -1328,89 +2557,140 @@ static void control_get_tunable(TALLOC_CTX *mem_ctx,
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_public_ip *ip = request->rdata.data.pubip;
        struct ctdb_reply_control reply;
-       uint32_t value;
-       bool ret;
+       struct ctdb_public_ip_list *ips = NULL;
+       struct ctdb_public_ip *t = NULL;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
-       reply.errmsg = NULL;
 
-       ret = ctdb_tunable_get_value(&ctdb->tun_list,
-                                    request->rdata.data.tun_var, &value);
-       if (! ret) {
-               reply.status = -1;
-       } else {
-               reply.rdata.data.tun_value = value;
-               reply.status = 0;
+       if (ctdb->known_ips == NULL) {
+               D_INFO("TAKEOVER_IP %s - not a public IP\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
+               goto done;
        }
 
-       client_send_control(req, header, &reply);
-}
-
-static void control_list_tunables(TALLOC_CTX *mem_ctx,
-                                 struct tevent_req *req,
-                                 struct ctdb_req_header *header,
-                                 struct ctdb_req_control *request)
-{
-       struct ctdb_reply_control reply;
-       struct ctdb_var_list *var_list;
+       ips = &ctdb->known_ips[header->destnode];
 
-       reply.rdata.opcode = request->opcode;
-       reply.errmsg = NULL;
+       t = NULL;
+       for (i = 0; i < ips->num; i++) {
+               if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
+                       t = &ips->ip[i];
+                       break;
+               }
+       }
+       if (t == NULL) {
+               D_INFO("TAKEOVER_IP %s - not a public IP\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
+               goto done;
+       }
 
-       var_list = ctdb_tunable_names(mem_ctx);
-       if (var_list == NULL) {
-               reply.status = -1;
+       if (t->pnn == header->destnode) {
+               D_INFO("TAKEOVER_IP %s - redundant\n",
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
        } else {
-               reply.rdata.data.tun_var_list = var_list;
-               reply.status = 0;
+               D_NOTICE("TAKEOVER_IP %s\n",
+                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
+               t->pnn = ip->pnn;
        }
 
+done:
+       reply.status = 0;
+       reply.errmsg = NULL;
        client_send_control(req, header, &reply);
 }
 
-static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
-                                    struct tevent_req *req,
-                                    struct ctdb_req_header *header,
-                                    struct ctdb_req_control *request)
+static void control_get_public_ips(TALLOC_CTX *mem_ctx,
+                                  struct tevent_req *req,
+                                  struct ctdb_req_header *header,
+                                  struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
+       struct ctdb_public_ip_list *ips = NULL;
 
        reply.rdata.opcode = request->opcode;
-       reply.rdata.data.tun_list = &ctdb->tun_list;
+
+       if (ctdb->known_ips == NULL) {
+               /* No IPs defined so create a dummy empty struct and ship it */
+               ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
+               if (ips == NULL) {
+                       reply.status = ENOMEM;
+                       reply.errmsg = "Memory error";
+                       goto done;
+               }
+               goto ok;
+       }
+
+       ips = &ctdb->known_ips[header->destnode];
+
+       if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
+               /* If runstate is not RUNNING or a node is then return
+                * no available IPs.  Don't worry about interface
+                * states here - we're not faking down to that level.
+                */
+               uint32_t flags = ctdb->node_map->node[header->destnode].flags;
+               if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
+                   ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
+                       /* No available IPs: return dummy empty struct */
+                       ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
+                       if (ips == NULL) {
+                               reply.status = ENOMEM;
+                               reply.errmsg = "Memory error";
+                               goto done;
+                       }
+               }
+       }
+
+ok:
+       reply.rdata.data.pubip_list = ips;
        reply.status = 0;
        reply.errmsg = NULL;
 
+done:
        client_send_control(req, header, &reply);
 }
 
-static void control_uptime(TALLOC_CTX *mem_ctx,
-                          struct tevent_req *req,
-                          struct ctdb_req_header *header,
-                          struct ctdb_req_control *request)
+static void control_get_nodemap(TALLOC_CTX *mem_ctx,
+                               struct tevent_req *req,
+                               struct ctdb_req_header *header,
+                               struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct ctdb_uptime *uptime;;
+       struct ctdb_node_map *nodemap;
+       struct node *node;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
-       uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
-       if (uptime == NULL) {
+       nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
+       if (nodemap == NULL) {
                goto fail;
        }
 
-       uptime->current_time = tevent_timeval_current();
-       uptime->ctdbd_start_time = ctdb->start_time;
-       uptime->last_recovery_started = ctdb->recovery_start_time;
-       uptime->last_recovery_finished = ctdb->recovery_end_time;
+       nodemap->num = ctdb->node_map->num_nodes;
+       nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
+                                    nodemap->num);
+       if (nodemap->node == NULL) {
+               goto fail;
+       }
 
-       reply.rdata.data.uptime = uptime;
+       for (i=0; i<nodemap->num; i++) {
+               node = &ctdb->node_map->node[i];
+               nodemap->node[i] = (struct ctdb_node_and_flags) {
+                       .pnn = node->pnn,
+                       .flags = node->flags,
+                       .addr = node->addr,
+               };
+       }
+
+       reply.rdata.data.nodemap = nodemap;
        reply.status = 0;
        reply.errmsg = NULL;
        client_send_control(req, header, &reply);
@@ -1422,223 +2702,273 @@ fail:
        client_send_control(req, header, &reply);
 }
 
-static void control_enable_monitor(TALLOC_CTX *mem_ctx,
-                                  struct tevent_req *req,
-                                  struct ctdb_req_header *header,
-                                  struct ctdb_req_control *request)
+static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
+                                    struct tevent_req *req,
+                                    struct ctdb_req_header *header,
+                                    struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
 
-       ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
-
        reply.rdata.opcode = request->opcode;
+
+       if (ctdb->reclock != NULL) {
+               reply.rdata.data.reclock_file =
+                       talloc_strdup(mem_ctx, ctdb->reclock);
+               if (reply.rdata.data.reclock_file == NULL) {
+                       reply.status = ENOMEM;
+                       reply.errmsg = "Memory error";
+                       goto done;
+               }
+       } else {
+               reply.rdata.data.reclock_file = NULL;
+       }
+
        reply.status = 0;
        reply.errmsg = NULL;
+
+done:
        client_send_control(req, header, &reply);
 }
 
-static void control_disable_monitor(TALLOC_CTX *mem_ctx,
-                                   struct tevent_req *req,
-                                   struct ctdb_req_header *header,
-                                   struct ctdb_req_control *request)
+static void control_stop_node(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
 
-       ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
-
        reply.rdata.opcode = request->opcode;
+
+       DEBUG(DEBUG_INFO, ("Stopping node\n"));
+       ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
+
        reply.status = 0;
        reply.errmsg = NULL;
+
        client_send_control(req, header, &reply);
+       return;
 }
 
-static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
-                                     struct tevent_req *req,
-                                     struct ctdb_req_header *header,
-                                     struct ctdb_req_control *request)
+static void control_continue_node(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct ctdb_node_map *nodemap;
-       struct node_map *node_map = ctdb->node_map;
-       int i;
 
        reply.rdata.opcode = request->opcode;
 
-       nodemap = read_nodes_file(mem_ctx, header->destnode);
-       if (nodemap == NULL) {
-               goto fail;
-       }
+       DEBUG(DEBUG_INFO, ("Continue node\n"));
+       ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
 
-       for (i=0; i<nodemap->num; i++) {
-               struct node *node;
+       reply.status = 0;
+       reply.errmsg = NULL;
 
-               if (i < node_map->num_nodes &&
-                   ctdb_sock_addr_same(&nodemap->node[i].addr,
-                                       &node_map->node[i].addr)) {
-                       continue;
-               }
+       client_send_control(req, header, &reply);
+       return;
+}
 
-               if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
-                       node = &node_map->node[i];
+static void set_ban_state_callback(struct tevent_req *subreq)
+{
+       struct node *node = tevent_req_callback_data(
+               subreq, struct node);
+       bool status;
 
-                       node->flags |= NODE_FLAGS_DELETED;
-                       parse_ip("0.0.0.0", NULL, 0, &node->addr);
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
+       }
 
-                       continue;
-               }
+       node->flags &= ~NODE_FLAGS_BANNED;
+}
 
-               if (i < node_map->num_nodes &&
-                   node_map->node[i].flags & NODE_FLAGS_DELETED) {
-                       node = &node_map->node[i];
+static void control_set_ban_state(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct tevent_req *subreq;
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_ban_state *ban = request->rdata.data.ban_state;
+       struct ctdb_reply_control reply;
+       struct node *node;
 
-                       node->flags &= ~NODE_FLAGS_DELETED;
-                       node->addr = nodemap->node[i].addr;
+       reply.rdata.opcode = request->opcode;
 
-                       continue;
-               }
+       if (ban->pnn != header->destnode) {
+               DEBUG(DEBUG_INFO,
+                     ("SET_BAN_STATE control for PNN %d rejected\n",
+                      ban->pnn));
+               reply.status = EINVAL;
+               goto fail;
+       }
 
-               node_map->node = talloc_realloc(node_map, node_map->node,
-                                               struct node,
-                                               node_map->num_nodes+1);
-               if (node_map->node == NULL) {
-                       goto fail;
-               }
-               node = &node_map->node[node_map->num_nodes];
+       node = &ctdb->node_map->node[header->destnode];
 
-               node->addr = nodemap->node[i].addr;
-               node->pnn = nodemap->node[i].pnn;
-               node->flags = 0;
-               node->capabilities = CTDB_CAP_DEFAULT;
-               node->recovery_disabled = false;
-               node->recovery_substate = NULL;
+       if (ban->time == 0) {
+               DEBUG(DEBUG_INFO,("Unbanning this node\n"));
+               node->flags &= ~NODE_FLAGS_BANNED;
+               goto done;
+       }
 
-               node_map->num_nodes += 1;
+       subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
+                                   tevent_timeval_current_ofs(
+                                           ban->time, 0));
+       if (subreq == NULL) {
+               reply.status = ENOMEM;
+               goto fail;
        }
+       tevent_req_set_callback(subreq, set_ban_state_callback, node);
 
-       talloc_free(nodemap);
+       DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
+       node->flags |= NODE_FLAGS_BANNED;
+       ctdb->vnn_map->generation = INVALID_GENERATION;
 
+done:
        reply.status = 0;
        reply.errmsg = NULL;
+
        client_send_control(req, header, &reply);
        return;
 
 fail:
-       reply.status = -1;
-       reply.errmsg = "Memory error";
-       client_send_control(req, header, &reply);
+       reply.errmsg = "Failed to ban node";
 }
 
-static void control_get_capabilities(TALLOC_CTX *mem_ctx,
-                                    struct tevent_req *req,
-                                    struct ctdb_req_header *header,
-                                    struct ctdb_req_control *request)
+static void control_trans3_commit(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct node *node;
-       uint32_t caps = 0;
+       struct database *db;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
 
-       node = &ctdb->node_map->node[header->destnode];
-       caps = node->capabilities;
+       db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Unknown database";
+               client_send_control(req, header, &reply);
+               return;
+       }
 
-       if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
-               /* Don't send reply */
+       if (! (db->flags &
+              (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
+               reply.status = -1;
+               reply.errmsg = "Transactions on volatile database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       ret = ltdb_transaction(db, request->rdata.data.recbuf);
+       if (ret != 0) {
+               reply.status = -1;
+               reply.errmsg = "Transaction failed";
+               client_send_control(req, header, &reply);
                return;
        }
 
-       reply.rdata.data.caps = caps;
        reply.status = 0;
        reply.errmsg = NULL;
-
        client_send_control(req, header, &reply);
 }
 
-static void control_get_nodemap(TALLOC_CTX *mem_ctx,
-                               struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_req_control *request)
+static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
+                              struct tevent_req *req,
+                              struct ctdb_req_header *header,
+                              struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct ctdb_node_map *nodemap;
-       struct node *node;
-       int i;
+       struct database *db;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
 
-       nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
-       if (nodemap == NULL) {
-               goto fail;
-       }
-
-       nodemap->num = ctdb->node_map->num_nodes;
-       nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
-                                    nodemap->num);
-       if (nodemap->node == NULL) {
-               goto fail;
-       }
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+       } else {
+               uint64_t seqnum;
 
-       for (i=0; i<nodemap->num; i++) {
-               node = &ctdb->node_map->node[i];
-               nodemap->node[i] = (struct ctdb_node_and_flags) {
-                       .pnn = node->pnn,
-                       .flags = node->flags,
-                       .addr = node->addr,
-               };
+               ret = database_seqnum(db, &seqnum);
+               if (ret == 0) {
+                       reply.rdata.data.seqnum = seqnum;
+                       reply.status = 0;
+                       reply.errmsg = NULL;
+               } else {
+                       reply.status = ret;
+                       reply.errmsg = "Failed to get seqnum";
+               }
        }
 
-       reply.rdata.data.nodemap = nodemap;
-       reply.status = 0;
-       reply.errmsg = NULL;
-       client_send_control(req, header, &reply);
-       return;
-
-fail:
-       reply.status = -1;
-       reply.errmsg = "Memory error";
        client_send_control(req, header, &reply);
 }
 
-static void control_get_ifaces(TALLOC_CTX *mem_ctx,
-                              struct tevent_req *req,
-                              struct ctdb_req_header *header,
-                              struct ctdb_req_control *request)
+static void control_db_get_health(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
 {
        struct client_state *state = tevent_req_data(
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct ctdb_iface_list *iface_list;
-       struct interface *iface;
-       int i;
+       struct database *db;
 
        reply.rdata.opcode = request->opcode;
 
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+       } else {
+               reply.rdata.data.reason = NULL;
+               reply.status = 0;
+               reply.errmsg = NULL;
+       }
+
+       client_send_control(req, header, &reply);
+}
+
+static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
+                                                  struct ctdbd_context *ctdb)
+{
+       struct ctdb_iface_list *iface_list;
+       struct interface *iface;
+       unsigned int i;
+
        iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
        if (iface_list == NULL) {
-               goto fail;
+               goto done;
        }
 
        iface_list->num = ctdb->iface_map->num;
        iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
                                         iface_list->num);
        if (iface_list->iface == NULL) {
-               goto fail;
+               TALLOC_FREE(iface_list);
+               goto done;
        }
 
        for (i=0; i<iface_list->num; i++) {
@@ -1647,8 +2977,107 @@ static void control_get_ifaces(TALLOC_CTX *mem_ctx,
                        .link_state = iface->link_up,
                        .references = iface->references,
                };
-               strncpy(iface_list->iface[i].name, iface->name,
-                       CTDB_IFACE_SIZE+2);
+               strlcpy(iface_list->iface[i].name, iface->name,
+                       sizeof(iface_list->iface[i].name));
+       }
+
+done:
+       return iface_list;
+}
+
+static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
+                                      struct tevent_req *req,
+                                      struct ctdb_req_header *header,
+                                      struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       ctdb_sock_addr *addr = request->rdata.data.addr;
+       struct ctdb_public_ip_list *known = NULL;
+       struct ctdb_public_ip_info *info = NULL;
+       unsigned i;
+
+       reply.rdata.opcode = request->opcode;
+
+       info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
+       if (info == NULL) {
+               reply.status = ENOMEM;
+               reply.errmsg = "Memory error";
+               goto done;
+       }
+
+       reply.rdata.data.ipinfo = info;
+
+       if (ctdb->known_ips != NULL) {
+               known = &ctdb->known_ips[header->destnode];
+       } else {
+               /* No IPs defined so create a dummy empty struct and
+                * fall through.  The given IP won't be matched
+                * below...
+                */
+               known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
+               if (known == NULL) {
+                       reply.status = ENOMEM;
+                       reply.errmsg = "Memory error";
+                       goto done;
+               }
+       }
+
+       for (i = 0; i < known->num; i++) {
+               if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
+                                          addr)) {
+                       break;
+               }
+       }
+
+       if (i == known->num) {
+               D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
+                     ctdb_sock_addr_to_string(mem_ctx, addr, false));
+               reply.status = -1;
+               reply.errmsg = "Unknown address";
+               goto done;
+       }
+
+       info->ip = known->ip[i];
+
+       /* The fake PUBLICIPS stanza and resulting known_ips data
+        * don't know anything about interfaces, so completely fake
+        * this.
+        */
+       info->active_idx = 0;
+
+       info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
+       if (info->ifaces == NULL) {
+               reply.status = ENOMEM;
+               reply.errmsg = "Memory error";
+               goto done;
+       }
+
+       reply.status = 0;
+       reply.errmsg = NULL;
+
+done:
+       client_send_control(req, header, &reply);
+}
+
+static void control_get_ifaces(TALLOC_CTX *mem_ctx,
+                              struct tevent_req *req,
+                              struct ctdb_req_header *header,
+                              struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct ctdb_iface_list *iface_list;
+
+       reply.rdata.opcode = request->opcode;
+
+       iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
+       if (iface_list == NULL) {
+               goto fail;
        }
 
        reply.rdata.data.iface_list = iface_list;
@@ -1718,15 +3147,212 @@ static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
                goto fail;
        }
 
-       iface->link_up = link_up;
+       iface->link_up = link_up;
+
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+       return;
+
+fail:
+       reply.status = -1;
+       client_send_control(req, header, &reply);
+}
+
+static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
+                                   struct tevent_req *req,
+                                   struct ctdb_req_header *header,
+                                   struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+               goto done;
+       }
+
+       if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
+               reply.status = EINVAL;
+               reply.errmsg = "Can not set READONLY on persistent db";
+               goto done;
+       }
+
+       db->flags |= CTDB_DB_FLAGS_READONLY;
+       reply.status = 0;
+       reply.errmsg = NULL;
+
+done:
+       client_send_control(req, header, &reply);
+}
+
+struct traverse_start_ext_state {
+       struct tevent_req *req;
+       struct ctdb_req_header *header;
+       uint32_t reqid;
+       uint64_t srvid;
+       bool withemptyrecords;
+       int status;
+};
+
+static int traverse_start_ext_handler(struct tdb_context *tdb,
+                                     TDB_DATA key, TDB_DATA data,
+                                     void *private_data)
+{
+       struct traverse_start_ext_state *state =
+               (struct traverse_start_ext_state *)private_data;
+       struct ctdb_rec_data rec;
+       struct ctdb_req_message_data message;
+       size_t np;
+
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return 0;
+       }
+
+       if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
+           (!state->withemptyrecords)) {
+               return 0;
+       }
+
+       rec = (struct ctdb_rec_data) {
+               .reqid = state->reqid,
+               .header = NULL,
+               .key = key,
+               .data = data,
+       };
+
+       message.srvid = state->srvid;
+       message.data.dsize = ctdb_rec_data_len(&rec);
+       message.data.dptr = talloc_size(state->req, message.data.dsize);
+       if (message.data.dptr == NULL) {
+               state->status = ENOMEM;
+               return 1;
+       }
+
+       ctdb_rec_data_push(&rec, message.data.dptr, &np);
+       client_send_message(state->req, state->header, &message);
+
+       talloc_free(message.data.dptr);
+
+       return 0;
+}
+
+static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
+                                      struct tevent_req *req,
+                                      struct ctdb_req_header *header,
+                                      struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+       struct ctdb_traverse_start_ext *ext;
+       struct traverse_start_ext_state t_state;
+       struct ctdb_rec_data rec;
+       struct ctdb_req_message_data message;
+       uint8_t buffer[32];
+       size_t np;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       ext = request->rdata.data.traverse_start_ext;
+
+       db = database_find(ctdb->db_map, ext->db_id);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Unknown database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       t_state = (struct traverse_start_ext_state) {
+               .req = req,
+               .header = header,
+               .reqid = ext->reqid,
+               .srvid = ext->srvid,
+               .withemptyrecords = ext->withemptyrecords,
+       };
+
+       ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
+       DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
+       if (t_state.status != 0) {
+               reply.status = -1;
+               reply.errmsg = "Memory error";
+               client_send_control(req, header, &reply);
+       }
+
+       reply.status = 0;
+       client_send_control(req, header, &reply);
+
+       rec = (struct ctdb_rec_data) {
+               .reqid = ext->reqid,
+               .header = NULL,
+               .key = tdb_null,
+               .data = tdb_null,
+       };
+
+       message.srvid = ext->srvid;
+       message.data.dsize = ctdb_rec_data_len(&rec);
+       ctdb_rec_data_push(&rec, buffer, &np);
+       message.data.dptr = buffer;
+       client_send_message(req, header, &message);
+}
+
+static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
+                                   struct tevent_req *req,
+                                   struct ctdb_req_header *header,
+                                   struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+               goto done;
+       }
+
+       if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
+               reply.status = EINVAL;
+               reply.errmsg = "Can not set STICKY on persistent db";
+               goto done;
+       }
 
+       db->flags |= CTDB_DB_FLAGS_STICKY;
        reply.status = 0;
        reply.errmsg = NULL;
+
+done:
        client_send_control(req, header, &reply);
-       return;
+}
+
+static void control_ipreallocated(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct ctdb_reply_control reply;
+
+       /* Always succeed */
+       reply.rdata.opcode = request->opcode;
+       reply.status = 0;
+       reply.errmsg = NULL;
 
-fail:
-       reply.status = -1;
        client_send_control(req, header, &reply);
 }
 
@@ -1775,6 +3401,156 @@ fail:
        client_send_control(req, header, &reply);
 }
 
+static void control_db_open_flags(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+       } else {
+               reply.rdata.data.tdb_flags = database_flags(db->flags);
+               reply.status = 0;
+               reply.errmsg = NULL;
+       }
+
+       client_send_control(req, header, &reply);
+}
+
+static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
+                                        struct tevent_req *req,
+                                        struct ctdb_req_header *header,
+                                        struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
+               }
+       }
+
+       db = database_new(ctdb->db_map, request->rdata.data.db_name,
+                         CTDB_DB_FLAGS_REPLICATED);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+done:
+       reply.rdata.data.db_id = db->id;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
+static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
+                                   struct tevent_req *req,
+                                   struct ctdb_req_header *header,
+                                   struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_client *client;
+       struct client_state *cstate;
+       struct ctdb_reply_control reply;
+       bool pid_found, srvid_found;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       pid_found = false;
+       srvid_found = false;
+
+       for (client=ctdb->client_list; client != NULL; client=client->next) {
+               if (client->pid == request->rdata.data.pid_srvid->pid) {
+                       pid_found = true;
+                       cstate = (struct client_state *)client->state;
+                       ret = srvid_exists(ctdb->srv,
+                                          request->rdata.data.pid_srvid->srvid,
+                                          cstate);
+                       if (ret == 0) {
+                               srvid_found = true;
+                               ret = kill(cstate->pid, 0);
+                               if (ret != 0) {
+                                       reply.status = ret;
+                                       reply.errmsg = strerror(errno);
+                               } else {
+                                       reply.status = 0;
+                                       reply.errmsg = NULL;
+                               }
+                       }
+               }
+       }
+
+       if (! pid_found) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID";
+       } else if (! srvid_found) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID and SRVID";
+       }
+
+       client_send_control(req, header, &reply);
+}
+
+static bool fake_control_failure(TALLOC_CTX *mem_ctx,
+                                struct tevent_req *req,
+                                struct ctdb_req_header *header,
+                                struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct fake_control_failure *f = NULL;
+
+       D_DEBUG("Checking fake control failure for control %u on node %u\n",
+               request->opcode, header->destnode);
+       for (f = ctdb->control_failures; f != NULL; f = f->next) {
+               if (f->opcode == request->opcode &&
+                   (f->pnn == header->destnode ||
+                    f->pnn == CTDB_UNKNOWN_PNN)) {
+
+                       reply.rdata.opcode = request->opcode;
+                       if (strcmp(f->error, "TIMEOUT") == 0) {
+                               /* Causes no reply */
+                               D_ERR("Control %u fake timeout on node %u\n",
+                                     request->opcode, header->destnode);
+                               return true;
+                       } else if (strcmp(f->error, "ERROR") == 0) {
+                               D_ERR("Control %u fake error on node %u\n",
+                                     request->opcode, header->destnode);
+                               reply.status = -1;
+                               reply.errmsg = f->comment;
+                               client_send_control(req, header, &reply);
+                               return true;
+                       }
+               }
+       }
+
+       return false;
+}
+
 static void control_error(TALLOC_CTX *mem_ctx,
                          struct tevent_req *req,
                          struct ctdb_req_header *header,
@@ -1782,6 +3558,8 @@ static void control_error(TALLOC_CTX *mem_ctx,
 {
        struct ctdb_reply_control reply;
 
+       D_DEBUG("Control %u not implemented\n", request->opcode);
+
        reply.rdata.opcode = request->opcode;
        reply.status = -1;
        reply.errmsg = "Not implemented";
@@ -1873,6 +3651,36 @@ fail:
        client_send_message(req, header, &reply);
 }
 
+static void message_takeover_run(TALLOC_CTX *mem_ctx,
+                                struct tevent_req *req,
+                                struct ctdb_req_header *header,
+                                struct ctdb_req_message *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_srvid_message *srvid = request->data.msg;
+       struct ctdb_req_message_data reply;
+       int ret = -1;
+       TDB_DATA data;
+
+       if (header->destnode != ctdb->node_map->recmaster) {
+               /* No reply! Only recmaster replies... */
+               return;
+       }
+
+       DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
+                          header->destnode));
+       ret = header->destnode;
+
+       reply.srvid = srvid->srvid;
+       data.dptr = (uint8_t *)&ret;
+       data.dsize = sizeof(int);
+       reply.data = data;
+
+       client_send_message(req, header, &reply);
+}
+
 /*
  * Handle a single client
  */
@@ -1882,6 +3690,8 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
 static void client_dead_handler(void *private_data);
 static void client_process_packet(struct tevent_req *req,
                                  uint8_t *buf, size_t buflen);
+static void client_process_call(struct tevent_req *req,
+                               uint8_t *buf, size_t buflen);
 static void client_process_message(struct tevent_req *req,
                                   uint8_t *buf, size_t buflen);
 static void client_process_control(struct tevent_req *req,
@@ -1907,6 +3717,8 @@ static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
        state->ctdb = ctdb;
        state->pnn = pnn;
 
+       (void) ctdb_get_peer_pid(fd, &state->pid);
+
        ret = comm_setup(state, ev, fd, client_read_handler, req,
                         client_dead_handler, req, &state->comm);
        if (ret != 0) {
@@ -1914,6 +3726,12 @@ static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
                return tevent_req_post(req, ev);
        }
 
+       ret = client_add(ctdb, state->pid, state);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return tevent_req_post(req, ev);
+       }
+
        DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
 
        return req;
@@ -1928,9 +3746,11 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_req_header header;
-       int ret, i;
+       size_t np;
+       unsigned int i;
+       int ret;
 
-       ret = ctdb_req_header_pull(buf, buflen, &header);
+       ret = ctdb_req_header_pull(buf, buflen, &header, &np);
        if (ret != 0) {
                return;
        }
@@ -1950,7 +3770,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                for (i=0; i<ctdb->node_map->num_nodes; i++) {
                        header.destnode = i;
 
-                       ctdb_req_header_push(&header, buf);
+                       ctdb_req_header_push(&header, buf, &np);
                        client_process_packet(req, buf, buflen);
                }
                return;
@@ -1965,7 +3785,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
 
                        header.destnode = i;
 
-                       ctdb_req_header_push(&header, buf);
+                       ctdb_req_header_push(&header, buf, &np);
                        client_process_packet(req, buf, buflen);
                }
                return;
@@ -1984,7 +3804,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                return;
        }
 
-       ctdb_req_header_push(&header, buf);
+       ctdb_req_header_push(&header, buf, &np);
        client_process_packet(req, buf, buflen);
 }
 
@@ -2000,14 +3820,19 @@ static void client_process_packet(struct tevent_req *req,
                                  uint8_t *buf, size_t buflen)
 {
        struct ctdb_req_header header;
+       size_t np;
        int ret;
 
-       ret = ctdb_req_header_pull(buf, buflen, &header);
+       ret = ctdb_req_header_pull(buf, buflen, &header, &np);
        if (ret != 0) {
                return;
        }
 
        switch (header.operation) {
+       case CTDB_REQ_CALL:
+               client_process_call(req, buf, buflen);
+               break;
+
        case CTDB_REQ_MESSAGE:
                client_process_message(req, buf, buflen);
                break;
@@ -2021,6 +3846,77 @@ static void client_process_packet(struct tevent_req *req,
        }
 }
 
+static void client_process_call(struct tevent_req *req,
+                               uint8_t *buf, size_t buflen)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       TALLOC_CTX *mem_ctx;
+       struct ctdb_req_header header;
+       struct ctdb_req_call request;
+       struct ctdb_reply_call reply;
+       struct database *db;
+       struct ctdb_ltdb_header hdr;
+       TDB_DATA data;
+       int ret;
+
+       mem_ctx = talloc_new(state);
+       if (tevent_req_nomem(mem_ctx, req)) {
+               return;
+       }
+
+       ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
+       if (ret != 0) {
+               talloc_free(mem_ctx);
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       header_fix_pnn(&header, ctdb);
+
+       if (header.destnode >= ctdb->node_map->num_nodes) {
+               goto fail;
+       }
+
+       DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
+
+       db = database_find(ctdb->db_map, request.db_id);
+       if (db == NULL) {
+               goto fail;
+       }
+
+       ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
+       if (ret != 0) {
+               goto fail;
+       }
+
+       /* Fake migration */
+       if (hdr.dmaster != ctdb->node_map->pnn) {
+               hdr.dmaster = ctdb->node_map->pnn;
+
+               ret = ltdb_store(db, request.key, &hdr, data);
+               if (ret != 0) {
+                       goto fail;
+               }
+       }
+
+       talloc_free(mem_ctx);
+
+       reply.status = 0;
+       reply.data = tdb_null;
+
+       client_send_call(req, &header, &reply);
+       return;
+
+fail:
+       talloc_free(mem_ctx);
+       reply.status = -1;
+       reply.data = tdb_null;
+
+       client_send_call(req, &header, &reply);
+}
+
 static void client_process_message(struct tevent_req *req,
                                   uint8_t *buf, size_t buflen)
 {
@@ -2060,6 +3956,10 @@ static void client_process_message(struct tevent_req *req,
 
        if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
                message_disable_recoveries(mem_ctx, req, &header, &request);
+       } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
+               message_takeover_run(mem_ctx, req, &header, &request);
+       } else {
+               D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
        }
 
        /* check srvid */
@@ -2104,6 +4004,10 @@ static void client_process_control(struct tevent_req *req,
        DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
                           request.opcode, header.reqid));
 
+       if (fake_control_failure(mem_ctx, req, &header, &request)) {
+               goto done;
+       }
+
        switch (request.opcode) {
        case CTDB_CONTROL_PROCESS_EXISTS:
                control_process_exists(mem_ctx, req, &header, &request);
@@ -2113,6 +4017,10 @@ static void client_process_control(struct tevent_req *req,
                control_ping(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_GETDBPATH:
+               control_getdbpath(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GETVNNMAP:
                control_getvnnmap(mem_ctx, req, &header, &request);
                break;
@@ -2125,6 +4033,10 @@ static void client_process_control(struct tevent_req *req,
                control_set_debug(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_GET_DBMAP:
+               control_get_dbmap(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_RECMODE:
                control_get_recmode(mem_ctx, req, &header, &request);
                break;
@@ -2133,6 +4045,10 @@ static void client_process_control(struct tevent_req *req,
                control_set_recmode(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_DB_ATTACH:
+               control_db_attach(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_REGISTER_SRVID:
                control_register_srvid(mem_ctx, req, &header, &request);
                break;
@@ -2141,6 +4057,10 @@ static void client_process_control(struct tevent_req *req,
                control_deregister_srvid(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_GET_DBNAME:
+               control_get_dbname(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_PID:
                control_get_pid(mem_ctx, req, &header, &request);
                break;
@@ -2157,10 +4077,6 @@ static void client_process_control(struct tevent_req *req,
                control_shutdown(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_GET_MONMODE:
-               control_get_monmode(mem_ctx, req, &header, &request);
-               break;
-
        case CTDB_CONTROL_SET_TUNABLE:
                control_set_tunable(mem_ctx, req, &header, &request);
                break;
@@ -2173,20 +4089,20 @@ static void client_process_control(struct tevent_req *req,
                control_list_tunables(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_GET_ALL_TUNABLES:
-               control_get_all_tunables(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_MODIFY_FLAGS:
+               control_modify_flags(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_UPTIME:
-               control_uptime(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_GET_ALL_TUNABLES:
+               control_get_all_tunables(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_ENABLE_MONITOR:
-               control_enable_monitor(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+               control_db_attach_persistent(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_DISABLE_MONITOR:
-               control_disable_monitor(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_UPTIME:
+               control_uptime(mem_ctx, req, &header, &request);
                break;
 
        case CTDB_CONTROL_RELOAD_NODES_FILE:
@@ -2197,10 +4113,54 @@ static void client_process_control(struct tevent_req *req,
                control_get_capabilities(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_RELEASE_IP:
+               control_release_ip(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_TAKEOVER_IP:
+               control_takeover_ip(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_GET_PUBLIC_IPS:
+               control_get_public_ips(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_NODEMAP:
                control_get_nodemap(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_GET_RECLOCK_FILE:
+               control_get_reclock_file(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_STOP_NODE:
+               control_stop_node(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_CONTINUE_NODE:
+               control_continue_node(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_SET_BAN_STATE:
+               control_set_ban_state(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_TRANS3_COMMIT:
+               control_trans3_commit(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_GET_DB_SEQNUM:
+               control_get_db_seqnum(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_DB_GET_HEALTH:
+               control_db_get_health(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
+               control_get_public_ip_info(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_IFACES:
                control_get_ifaces(mem_ctx, req, &header, &request);
                break;
@@ -2209,6 +4169,22 @@ static void client_process_control(struct tevent_req *req,
                control_set_iface_link_state(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_SET_DB_READONLY:
+               control_set_db_readonly(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_TRAVERSE_START_EXT:
+               control_traverse_start_ext(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_SET_DB_STICKY:
+               control_set_db_sticky(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_IPREALLOCATED:
+               control_ipreallocated(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_RUNSTATE:
                control_get_runstate(mem_ctx, req, &header, &request);
                break;
@@ -2217,6 +4193,18 @@ static void client_process_control(struct tevent_req *req,
                control_get_nodes_file(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_DB_OPEN_FLAGS:
+               control_db_open_flags(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_DB_ATTACH_REPLICATED:
+               control_db_attach_replicated(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_CHECK_PID_SRVID:
+               control_check_pid_srvid(mem_ctx, req, &header, &request);
+               break;
+
        default:
                if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
                        control_error(mem_ctx, req, &header, &request);
@@ -2224,6 +4212,7 @@ static void client_process_control(struct tevent_req *req,
                break;
        }
 
+done:
        talloc_free(mem_ctx);
 }
 
@@ -2406,18 +4395,23 @@ fail:
 }
 
 static struct options {
+       const char *dbdir;
        const char *sockpath;
        const char *pidfile;
        const char *debuglevel;
 } options;
 
 static struct poptOption cmdline_options[] = {
+       POPT_AUTOHELP
+       { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
+               "Database directory", "directory" },
        { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
                "Unix domain socket path", "filename" },
        { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
                "pid file", "filename" } ,
        { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
                "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
+       POPT_TABLEEND
 };
 
 static void cleanup(void)
@@ -2468,7 +4462,6 @@ int main(int argc, const char *argv[])
        TALLOC_CTX *mem_ctx;
        struct ctdbd_context *ctdb;
        struct tevent_context *ev;
-       enum debug_level debug_level;
        poptContext pc;
        int opt, fd, ret, pfd[2];
        ssize_t len;
@@ -2482,6 +4475,12 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       if (options.dbdir == NULL) {
+               fprintf(stderr, "Please specify database directory\n");
+               poptPrintHelp(pc, stdout, 0);
+               exit(1);
+       }
+
        if (options.sockpath == NULL) {
                fprintf(stderr, "Please specify socket path\n");
                poptPrintHelp(pc, stdout, 0);
@@ -2494,25 +4493,20 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
-       if (options.debuglevel == NULL) {
-               DEBUGLEVEL = debug_level_to_int(DEBUG_ERR);
-       } else {
-               if (debug_level_parse(options.debuglevel, &debug_level)) {
-                       DEBUGLEVEL = debug_level_to_int(debug_level);
-               } else {
-                       fprintf(stderr, "Invalid debug level\n");
-                       poptPrintHelp(pc, stdout, 0);
-                       exit(1);
-               }
-       }
-
        mem_ctx = talloc_new(NULL);
        if (mem_ctx == NULL) {
                fprintf(stderr, "Memory error\n");
                exit(1);
        }
 
-       ctdb = ctdbd_setup(mem_ctx);
+       ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
+       if (ret != 0) {
+               fprintf(stderr, "Invalid debug level\n");
+               poptPrintHelp(pc, stdout, 0);
+               exit(1);
+       }
+
+       ctdb = ctdbd_setup(mem_ctx, options.dbdir);
        if (ctdb == NULL) {
                exit(1);
        }