libndr: Avoid assigning duplicate versions to symbols
[amitay/samba.git] / ctdb / tests / src / fake_ctdbd.c
index 6693c7ff12a5c5a380689e52a80a9db335bbbe67..146b7da344c62fd41da11e21ba67d23a0fcc81cf 100644 (file)
@@ -20,6 +20,7 @@
 #include "replace.h"
 #include "system/network.h"
 #include "system/time.h"
+#include "system/filesys.h"
 
 #include <popt.h>
 #include <talloc.h>
 
 #include "protocol/protocol.h"
 #include "protocol/protocol_api.h"
+#include "protocol/protocol_util.h"
+#include "protocol/protocol_private.h"
 
 #include "common/comm.h"
-#include "common/system.h"
 #include "common/logging.h"
 #include "common/tunable.h"
+#include "common/srvid.h"
+#include "common/system.h"
 
 #include "ipalloc_read_known_ips.h"
 
@@ -83,21 +87,33 @@ struct vnn_map {
 };
 
 struct database {
+       struct database *prev, *next;
        const char *name;
+       const char *path;
+       struct tdb_context *tdb;
        uint32_t id;
        uint8_t flags;
        uint64_t seq_num;
 };
 
 struct database_map {
-       int num_dbs;
        struct database *db;
+       const char *dbdir;
+};
+
+struct fake_control_failure {
+       struct fake_control_failure  *prev, *next;
+       enum ctdb_controls opcode;
+       uint32_t pnn;
+       const char *error;
+       const char *comment;
 };
 
-struct srvid_register_state {
-       struct srvid_register_state *prev, *next;
+struct ctdb_client {
+       struct ctdb_client *prev, *next;
        struct ctdbd_context *ctdb;
-       uint64_t srvid;
+       pid_t pid;
+       void *state;
 };
 
 struct ctdbd_context {
@@ -105,7 +121,7 @@ struct ctdbd_context {
        struct interface_map *iface_map;
        struct vnn_map *vnn_map;
        struct database_map *db_map;
-       struct srvid_register_state *rstate;
+       struct srvid_context *srv;
        int num_clients;
        struct timeval start_time;
        struct timeval recovery_start_time;
@@ -114,9 +130,10 @@ struct ctdbd_context {
        int log_level;
        enum ctdb_runstate runstate;
        struct ctdb_tunable_list tun_list;
-       int monitoring_mode;
        char *reclock;
        struct ctdb_public_ip_list *known_ips;
+       struct fake_control_failure *control_failures;
+       struct ctdb_client *client_list;
 };
 
 /*
@@ -158,6 +175,7 @@ static bool nodemap_parse(struct node_map *node_map)
                char *ip;
                ctdb_sock_addr saddr;
                struct node *node;
+               int ret;
 
                if (line[0] == '\n') {
                        break;
@@ -182,10 +200,12 @@ static bool nodemap_parse(struct node_map *node_map)
                        fprintf(stderr, "bad line (%s) - missing IP\n", line);
                        continue;
                }
-               if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
+               ret = ctdb_sock_addr_from_string(tok, &saddr, false);
+               if (ret != 0) {
                        fprintf(stderr, "bad line (%s) - invalid IP\n", line);
                        continue;
                }
+               ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
                ip = talloc_strdup(node_map, tok);
                if (ip == NULL) {
                        goto fail;
@@ -237,7 +257,12 @@ static bool nodemap_parse(struct node_map *node_map)
                }
                node = &node_map->node[node_map->num_nodes];
 
-               parse_ip(ip, NULL, CTDB_PORT, &node->addr);
+               ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
+               if (ret != 0) {
+                       fprintf(stderr, "bad line (%s) - invalid IP\n", line);
+                       continue;
+               }
+               ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
                node->pnn = pnn;
                node->flags = flags;
                node->capabilities = capabilities;
@@ -263,11 +288,14 @@ static bool node_map_add(struct ctdb_node_map *nodemap,
        ctdb_sock_addr addr;
        uint32_t num;
        struct ctdb_node_and_flags *n;
+       int ret;
 
-       if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
+       ret = ctdb_sock_addr_from_string(nstr, &addr, false);
+       if (ret != 0) {
                fprintf(stderr, "Invalid IP address %s\n", nstr);
                return false;
        }
+       ctdb_sock_addr_set_port(&addr, CTDB_PORT);
 
        num = nodemap->num;
        nodemap->node = talloc_realloc(nodemap, nodemap->node,
@@ -358,28 +386,49 @@ static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
                                             uint32_t pnn)
 {
        struct ctdb_node_map *nodemap;
-       char nodepath[PATH_MAX];
-       const char *nodes_list;
-
-       /* read the nodes file */
-       sprintf(nodepath, "CTDB_NODES_%u", pnn);
-       nodes_list = getenv(nodepath);
-       if (nodes_list == NULL) {
-               nodes_list = getenv("CTDB_NODES");
-               if (nodes_list == NULL) {
-                       DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
+       char nodes_list[PATH_MAX];
+       const char *ctdb_base;
+       int num;
+
+       ctdb_base = getenv("CTDB_BASE");
+       if (ctdb_base == NULL) {
+               D_ERR("CTDB_BASE is not set\n");
+               return NULL;
+       }
+
+       /* read optional node-specific nodes file */
+       num = snprintf(nodes_list, sizeof(nodes_list),
+                      "%s/nodes.%d", ctdb_base, pnn);
+       if (num == sizeof(nodes_list)) {
+               D_ERR("nodes file path too long\n");
+               return NULL;
+       }
+       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+       if (nodemap != NULL) {
+               /* Fake a load failure for an empty nodemap */
+               if (nodemap->num == 0) {
+                       talloc_free(nodemap);
+
+                       D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
                        return NULL;
                }
+
+               return nodemap;
        }
 
-       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
-       if (nodemap == NULL) {
-               DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
-                                  nodes_list));
+       /* read normal nodes file */
+       num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
+       if (num == sizeof(nodes_list)) {
+               D_ERR("nodes file path too long\n");
                return NULL;
        }
+       nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
+       if (nodemap != NULL) {
+               return nodemap;
+       }
 
-       return nodemap;
+       DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
+       return NULL;
 }
 
 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
@@ -586,7 +635,8 @@ fail:
        return false;
 }
 
-static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
+static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
+                                      const char *dbdir)
 {
        struct database_map *db_map;
 
@@ -595,6 +645,12 @@ static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
                return NULL;
        }
 
+       db_map->dbdir = talloc_strdup(db_map, dbdir);
+       if (db_map->dbdir == NULL) {
+               talloc_free(db_map);
+               return NULL;
+       }
+
        return db_map;
 }
 
@@ -654,8 +710,13 @@ static bool dbmap_parse(struct database_map *db_map)
                                flags |= CTDB_DB_FLAGS_STICKY;
                        } else if (strcmp(tok, "READONLY") == 0) {
                                flags |= CTDB_DB_FLAGS_READONLY;
+                       } else if (strcmp(tok, "REPLICATED") == 0) {
+                               flags |= CTDB_DB_FLAGS_REPLICATED;
                        } else if (tok[0] >= '0'&& tok[0] <= '9') {
-                               if ((flags & CTDB_DB_FLAGS_PERSISTENT) == 0) {
+                               uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
+                                            CTDB_DB_FLAGS_REPLICATED;
+
+                               if ((flags & nv) == 0) {
                                        fprintf(stderr,
                                                "seq_num for volatile db\n");
                                        goto fail;
@@ -666,20 +727,22 @@ static bool dbmap_parse(struct database_map *db_map)
                        tok = strtok(NULL, " \t");
                }
 
-               db_map->db = talloc_realloc(db_map, db_map->db,
-                                           struct database,
-                                           db_map->num_dbs + 1);
-               if (db_map->db == NULL) {
+               db = talloc_zero(db_map, struct database);
+               if (db == NULL) {
                        goto fail;
                }
-               db = &db_map->db[db_map->num_dbs];
 
                db->id = id;
-               db->name = name;
+               db->name = talloc_steal(db, name);
+               db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+               if (db->path == NULL) {
+                       talloc_free(db);
+                       goto fail;
+               }
                db->flags = flags;
                db->seq_num = seq_num;
 
-               db_map->num_dbs += 1;
+               DLIST_ADD_END(db_map->db, db);
        }
 
        DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
@@ -691,14 +754,12 @@ fail:
 
 }
 
-static struct database *database_find(struct database_map *map,
+static struct database *database_find(struct database_map *db_map,
                                      uint32_t db_id)
 {
-       int i;
-
-       for (i = 0; i < map->num_dbs; i++) {
-               struct database *db = &map->db[i];
+       struct database *db;
 
+       for (db = db_map->db; db != NULL; db = db->next) {
                if (db->id == db_id) {
                        return db;
                }
@@ -707,9 +768,277 @@ static struct database *database_find(struct database_map *map,
        return NULL;
 }
 
+static int database_count(struct database_map *db_map)
+{
+       struct database *db;
+       int count = 0;
+
+       for (db = db_map->db; db != NULL; db = db->next) {
+               count += 1;
+       }
+
+       return count;
+}
+
+static int database_flags(uint8_t db_flags)
+{
+       int tdb_flags = 0;
+
+       if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
+               tdb_flags = TDB_DEFAULT;
+       } else {
+               /* volatile and replicated use the same flags */
+               tdb_flags = TDB_NOSYNC |
+                           TDB_CLEAR_IF_FIRST |
+                           TDB_INCOMPATIBLE_HASH;
+       }
+
+       tdb_flags |= TDB_DISALLOW_NESTING;
+
+       return tdb_flags;
+}
+
+static struct database *database_new(struct database_map *db_map,
+                                    const char *name, uint8_t flags)
+{
+       struct database *db;
+       TDB_DATA key;
+       int tdb_flags;
+
+       db = talloc_zero(db_map, struct database);
+       if (db == NULL) {
+               return NULL;
+       }
+
+       db->name = talloc_strdup(db, name);
+       if (db->name == NULL) {
+               goto fail;
+       }
+
+       db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
+       if (db->path == NULL) {
+               goto fail;
+       }
+
+       key.dsize = strlen(db->name) + 1;
+       key.dptr = discard_const(db->name);
+
+       db->id = tdb_jenkins_hash(&key);
+       db->flags = flags;
+
+       tdb_flags = database_flags(flags);
+
+       db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
+       if (db->tdb == NULL) {
+               DBG_ERR("tdb_open\n");
+               goto fail;
+       }
+
+       DLIST_ADD_END(db_map->db, db);
+       return db;
+
+fail:
+       DBG_ERR("Memory error\n");
+       talloc_free(db);
+       return NULL;
+
+}
+
+static int ltdb_store(struct database *db, TDB_DATA key,
+                     struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+       int ret;
+       bool db_volatile = true;
+       bool keep = false;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
+           (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
+               db_volatile = false;
+       }
+
+       if (data.dsize > 0) {
+               keep = true;
+       } else {
+               if (db_volatile && header->rsn == 0) {
+                       keep = true;
+               }
+       }
+
+       if (keep) {
+               TDB_DATA rec[2];
+
+               rec[0].dsize = ctdb_ltdb_header_len(header);
+               rec[0].dptr = (uint8_t *)header;
+
+               rec[1].dsize = data.dsize;
+               rec[1].dptr = data.dptr;
+
+               ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
+       } else {
+               if (header->rsn > 0) {
+                       ret = tdb_delete(db->tdb, key);
+               } else {
+                       ret = 0;
+               }
+       }
+
+       return ret;
+}
+
+static int ltdb_fetch(struct database *db, TDB_DATA key,
+                     struct ctdb_ltdb_header *header,
+                     TALLOC_CTX *mem_ctx, TDB_DATA *data)
+{
+       TDB_DATA rec;
+       size_t np;
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       rec = tdb_fetch(db->tdb, key);
+       ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
+       if (ret != 0) {
+               if (rec.dptr != NULL) {
+                       free(rec.dptr);
+               }
+
+               *header = (struct ctdb_ltdb_header) {
+                       .rsn = 0,
+                       .dmaster = 0,
+                       .flags = 0,
+               };
+
+               ret = ltdb_store(db, key, header, tdb_null);
+               if (ret != 0) {
+                       return ret;
+               }
+
+               *data = tdb_null;
+               return 0;
+       }
+
+       data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
+       data->dptr = talloc_memdup(mem_ctx,
+                                  rec.dptr + ctdb_ltdb_header_len(header),
+                                  data->dsize);
+
+       free(rec.dptr);
+
+       if (data->dptr == NULL) {
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static int database_seqnum(struct database *db, uint64_t *seqnum)
+{
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header header;
+       size_t np;
+       int ret;
+
+       if (db->tdb == NULL) {
+               *seqnum = db->seq_num;
+               return 0;
+       }
+
+       key.dptr = discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+
+       ret = ltdb_fetch(db, key, &header, db, &data);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (data.dsize == 0) {
+               *seqnum = 0;
+               return 0;
+       }
+
+       ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
+       talloc_free(data.dptr);
+       if (ret != 0) {
+               *seqnum = 0;
+       }
+
+       return ret;
+}
+
+static int ltdb_transaction_update(uint32_t reqid,
+                                  struct ctdb_ltdb_header *no_header,
+                                  TDB_DATA key, TDB_DATA data,
+                                  void *private_data)
+{
+       struct database *db = (struct database *)private_data;
+       TALLOC_CTX *tmp_ctx = talloc_new(db);
+       struct ctdb_ltdb_header header = { 0 }, oldheader;
+       TDB_DATA olddata;
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = ctdb_ltdb_header_extract(&data, &header);
+       if (ret != 0) {
+               return ret;
+       }
+
+       ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (olddata.dsize > 0) {
+               if (oldheader.rsn > header.rsn ||
+                   (oldheader.rsn == header.rsn &&
+                    olddata.dsize != data.dsize)) {
+                       return -1;
+               }
+       }
+
+       talloc_free(tmp_ctx);
+
+       ret = ltdb_store(db, key, &header, data);
+       return ret;
+}
+
+static int ltdb_transaction(struct database *db,
+                           struct ctdb_rec_buffer *recbuf)
+{
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = tdb_transaction_start(db->tdb);
+       if (ret == -1) {
+               return ret;
+       }
+
+       ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
+       if (ret != 0) {
+               tdb_transaction_cancel(db->tdb);
+       }
+
+       ret = tdb_transaction_commit(db->tdb);
+       return ret;
+}
+
 static bool public_ips_parse(struct ctdbd_context *ctdb,
                             uint32_t numnodes)
 {
+       bool status;
+
        if (numnodes == 0) {
                D_ERR("Must initialise nodemap before public IPs\n");
                return false;
@@ -717,7 +1046,147 @@ static bool public_ips_parse(struct ctdbd_context *ctdb,
 
        ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
 
-       return (ctdb->known_ips != NULL);
+       status = (ctdb->known_ips != NULL);
+
+       if (status) {
+               D_INFO("Parsing public IPs done\n");
+       } else {
+               D_INFO("Parsing public IPs failed\n");
+       }
+
+       return status;
+}
+
+/* Read information about controls to fail.  Format is:
+ *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
+ */
+static bool control_failures_parse(struct ctdbd_context *ctdb)
+{
+       char line[1024];
+
+       while ((fgets(line, sizeof(line), stdin) != NULL)) {
+               char *tok, *t;
+               enum ctdb_controls opcode;
+               uint32_t pnn;
+               const char *error;
+               const char *comment;
+               struct fake_control_failure *failure = NULL;
+
+               if (line[0] == '\n') {
+                       break;
+               }
+
+               /* Get rid of pesky newline */
+               if ((t = strchr(line, '\n')) != NULL) {
+                       *t = '\0';
+               }
+
+               /* Get opcode */
+               tok = strtok(line, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing opcode\n", line);
+                       continue;
+               }
+               opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
+
+               /* Get PNN */
+               tok = strtok(NULL, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing PNN\n", line);
+                       continue;
+               }
+               pnn = (uint32_t)strtoul(tok, NULL, 0);
+
+               /* Get error */
+               tok = strtok(NULL, " \t");
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing errno\n", line);
+                       continue;
+               }
+               error = talloc_strdup(ctdb, tok);
+               if (error == NULL) {
+                       goto fail;
+               }
+               if (strcmp(error, "ERROR") != 0 &&
+                   strcmp(error, "TIMEOUT") != 0) {
+                       D_ERR("bad line (%s) "
+                             "- error must be \"ERROR\" or \"TIMEOUT\"\n",
+                             line);
+                       goto fail;
+               }
+
+               /* Get comment */
+               tok = strtok(NULL, "\n"); /* rest of line */
+               if (tok == NULL) {
+                       D_ERR("bad line (%s) - missing comment\n", line);
+                       continue;
+               }
+               comment = talloc_strdup(ctdb, tok);
+               if (comment == NULL) {
+                       goto fail;
+               }
+
+               failure = talloc_zero(ctdb, struct fake_control_failure);
+               if (failure == NULL) {
+                       goto fail;
+               }
+
+               failure->opcode = opcode;
+               failure->pnn = pnn;
+               failure->error = error;
+               failure->comment = comment;
+
+               DLIST_ADD(ctdb->control_failures, failure);
+       }
+
+       D_INFO("Parsing fake control failures done\n");
+       return true;
+
+fail:
+       D_INFO("Parsing fake control failures failed\n");
+       return false;
+}
+
+/*
+ * Manage clients
+ */
+
+static int ctdb_client_destructor(struct ctdb_client *client)
+{
+       DLIST_REMOVE(client->ctdb->client_list, client);
+       return 0;
+}
+
+static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
+                     void *client_state)
+{
+       struct ctdb_client *client;
+
+       client = talloc_zero(client_state, struct ctdb_client);
+       if (client == NULL) {
+               return ENOMEM;
+       }
+
+       client->ctdb = ctdb;
+       client->pid = client_pid;
+       client->state = client_state;
+
+       DLIST_ADD(ctdb->client_list, client);
+       talloc_set_destructor(client, ctdb_client_destructor);
+       return 0;
+}
+
+static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
+{
+       struct ctdb_client *client;
+
+       for (client=ctdb->client_list; client != NULL; client=client->next) {
+               if (client->pid == client_pid) {
+                       return client->state;
+               }
+       }
+
+       return NULL;
 }
 
 /*
@@ -739,11 +1208,13 @@ static uint32_t new_generation(uint32_t old_generation)
        return generation;
 }
 
-static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
+static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
+                                        const char *dbdir)
 {
        struct ctdbd_context *ctdb;
        char line[1024];
        bool status;
+       int ret;
 
        ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
        if (ctdb == NULL) {
@@ -765,11 +1236,16 @@ static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
                goto fail;
        }
 
-       ctdb->db_map = dbmap_init(ctdb);
+       ctdb->db_map = dbmap_init(ctdb, dbdir);
        if (ctdb->db_map == NULL) {
                goto fail;
        }
 
+       ret = srvid_init(ctdb, &ctdb->srv);
+       if (ret != 0) {
+               goto fail;
+       }
+
        while (fgets(line, sizeof(line), stdin) != NULL) {
                char *t;
 
@@ -790,6 +1266,8 @@ static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
                                                  ctdb->node_map->num_nodes);
                } else if (strcmp(line, "RECLOCK") == 0) {
                        status = reclock_parse(ctdb);
+               } else if (strcmp(line, "CONTROLFAILS") == 0) {
+                       status = control_failures_parse(ctdb);
                } else {
                        fprintf(stderr, "Unknown line %s\n", line);
                        status = false;
@@ -814,8 +1292,6 @@ static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
 
        ctdb_tunable_set_defaults(&ctdb->tun_list);
 
-       ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
-
        return ctdb;
 
 fail:
@@ -826,7 +1302,7 @@ fail:
 static bool ctdbd_verify(struct ctdbd_context *ctdb)
 {
        struct node *node;
-       int i;
+       unsigned int i;
 
        if (ctdb->node_map->num_nodes == 0) {
                return true;
@@ -896,7 +1372,7 @@ static int recover_check(struct tevent_req *req)
        struct ctdbd_context *ctdb = state->ctdb;
        struct tevent_req *subreq;
        bool recovery_disabled;
-       int i;
+       unsigned int i;
 
        recovery_disabled = false;
        for (i=0; i<ctdb->node_map->num_nodes; i++) {
@@ -995,6 +1471,25 @@ static void header_fix_pnn(struct ctdb_req_header *header,
        }
 }
 
+static struct ctdb_req_header header_reply_call(
+                                       struct ctdb_req_header *header,
+                                       struct ctdbd_context *ctdb)
+{
+       struct ctdb_req_header reply_header;
+
+       reply_header = (struct ctdb_req_header) {
+               .ctdb_magic = CTDB_MAGIC,
+               .ctdb_version = CTDB_PROTOCOL,
+               .generation = ctdb->vnn_map->generation,
+               .operation = CTDB_REPLY_CALL,
+               .destnode = header->srcnode,
+               .srcnode = header->destnode,
+               .reqid = header->reqid,
+       };
+
+       return reply_header;
+}
+
 static struct ctdb_req_header header_reply_control(
                                        struct ctdb_req_header *header,
                                        struct ctdbd_context *ctdb)
@@ -1042,17 +1537,55 @@ struct client_state {
        int fd;
        struct ctdbd_context *ctdb;
        int pnn;
+       pid_t pid;
        struct comm_context *comm;
        struct srvid_register_state *rstate;
        int status;
 };
 
 /*
- * Send replies to controls and messages
+ * Send replies to call, controls and messages
  */
 
 static void client_reply_done(struct tevent_req *subreq);
 
+static void client_send_call(struct tevent_req *req,
+                            struct ctdb_req_header *header,
+                            struct ctdb_reply_call *reply)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct tevent_req *subreq;
+       struct ctdb_req_header reply_header;
+       uint8_t *buf;
+       size_t datalen, buflen;
+       int ret;
+
+       reply_header = header_reply_call(header, ctdb);
+
+       datalen = ctdb_reply_call_len(&reply_header, reply);
+       ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, client_reply_done, req);
+
+       talloc_steal(subreq, buf);
+}
+
 static void client_send_message(struct tevent_req *req,
                                struct ctdb_req_header *header,
                                struct ctdb_req_message_data *message)
@@ -1155,11 +1688,22 @@ static void control_process_exists(TALLOC_CTX *mem_ctx,
                                   struct ctdb_req_header *header,
                                   struct ctdb_req_control *request)
 {
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct client_state *cstate;
        struct ctdb_reply_control reply;
 
        reply.rdata.opcode = request->opcode;
-       reply.status = kill(request->rdata.data.pid, 0);
-       reply.errmsg = NULL;
+
+       cstate = client_find(ctdb, request->rdata.data.pid);
+       if (cstate == NULL) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID";
+       } else {
+               reply.status = kill(request->rdata.data.pid, 0);
+               reply.errmsg = NULL;
+       }
 
        client_send_control(req, header, &reply);
 }
@@ -1199,15 +1743,8 @@ static void control_getdbpath(TALLOC_CTX *mem_ctx,
                reply.status = ENOENT;
                reply.errmsg = "Database not found";
        } else {
-               const char *base;
-               if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
-                       base = "/var/lib/ctdb/persistent";
-               } else {
-                       base = "/var/run/ctdb/DB_DIR";
-               }
                reply.rdata.data.db_path =
-                       talloc_asprintf(mem_ctx, "%s/%s.%u",
-                                       base, db->name, header->destnode);
+                       talloc_strdup(mem_ctx, db->path);
                if (reply.rdata.data.db_path == NULL) {
                        reply.status = ENOMEM;
                        reply.errmsg = "Memory error";
@@ -1297,7 +1834,8 @@ static void control_get_dbmap(TALLOC_CTX *mem_ctx,
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
        struct ctdb_dbid_map *dbmap;
-       int i;
+       struct database *db;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
@@ -1306,18 +1844,20 @@ static void control_get_dbmap(TALLOC_CTX *mem_ctx,
                goto fail;
        }
 
-       dbmap->num = ctdb->db_map->num_dbs;
+       dbmap->num = database_count(ctdb->db_map);
        dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
        if (dbmap->dbs == NULL) {
                goto fail;
        }
 
+       db = ctdb->db_map->db;
        for (i = 0; i < dbmap->num; i++) {
-               struct database *db = &ctdb->db_map->db[i];
                dbmap->dbs[i] = (struct ctdb_dbid) {
                        .db_id = db->id,
                        .flags = db->flags,
                };
+
+               db = db->next;
        }
 
        reply.rdata.data.dbmap = dbmap;
@@ -1424,10 +1964,43 @@ fail:
 
 }
 
-static int srvid_register_state_destructor(struct srvid_register_state *rstate)
+static void control_db_attach(TALLOC_CTX *mem_ctx,
+                             struct tevent_req *req,
+                             struct ctdb_req_header *header,
+                             struct ctdb_req_control *request)
 {
-       DLIST_REMOVE(rstate->ctdb->rstate, rstate);
-       return 0;
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
+               }
+       }
+
+       db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+done:
+       reply.rdata.data.db_id = db->id;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
+static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
+{
+       printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
 }
 
 static void control_register_srvid(TALLOC_CTX *mem_ctx,
@@ -1439,24 +2012,19 @@ static void control_register_srvid(TALLOC_CTX *mem_ctx,
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct srvid_register_state *rstate;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
 
-       rstate = talloc_zero(ctdb, struct srvid_register_state);
-       if (rstate == NULL) {
+       ret = srvid_register(ctdb->srv, state, request->srvid,
+                            srvid_handler, state);
+       if (ret != 0) {
                reply.status = -1;
                reply.errmsg = "Memory error";
                goto fail;
        }
-       rstate->ctdb = ctdb;
-       rstate->srvid = request->srvid;
-
-       talloc_set_destructor(rstate, srvid_register_state_destructor);
 
-       DLIST_ADD_END(ctdb->rstate, rstate);
-
-       DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
+       DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
 
        reply.status = 0;
        reply.errmsg = NULL;
@@ -1474,33 +2042,23 @@ static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
-       struct srvid_register_state *rstate = NULL;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
 
-       for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
-               if (rstate->srvid == request->srvid) {
-                       break;
-               }
-       }
-
-       if (rstate == NULL) {
+       ret = srvid_deregister(ctdb->srv, request->srvid, state);
+       if (ret != 0) {
                reply.status = -1;
                reply.errmsg = "srvid not registered";
                goto fail;
        }
 
-       DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
-       talloc_free(rstate);
+       DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
 
        reply.status = 0;
        reply.errmsg = NULL;
 
-       client_send_control(req, header, &reply);
-       return;
-
 fail:
-       TALLOC_FREE(rstate);
        client_send_control(req, header, &reply);
 }
 
@@ -1591,23 +2149,6 @@ static void control_shutdown(TALLOC_CTX *mem_ctx,
        state->status = 99;
 }
 
-static void control_get_monmode(TALLOC_CTX *mem_ctx,
-                               struct tevent_req *req,
-                               struct ctdb_req_header *header,
-                               struct ctdb_req_control *request)
-{
-       struct client_state *state = tevent_req_data(
-               req, struct client_state);
-       struct ctdbd_context *ctdb = state->ctdb;
-       struct ctdb_reply_control reply;
-
-       reply.rdata.opcode = request->opcode;
-       reply.status = ctdb->monitoring_mode;
-       reply.errmsg = NULL;
-
-       client_send_control(req, header, &reply);
-}
-
 static void control_set_tunable(TALLOC_CTX *mem_ctx,
                                struct tevent_req *req,
                                struct ctdb_req_header *header,
@@ -1757,6 +2298,41 @@ static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
        client_send_control(req, header, &reply);
 }
 
+static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
+                                        struct tevent_req *req,
+                                        struct ctdb_req_header *header,
+                                        struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
+               }
+       }
+
+       db = database_new(ctdb->db_map, request->rdata.data.db_name,
+                         CTDB_DB_FLAGS_PERSISTENT);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+done:
+       reply.rdata.data.db_id = db->id;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
 static void control_uptime(TALLOC_CTX *mem_ctx,
                           struct tevent_req *req,
                           struct ctdb_req_header *header,
@@ -1792,42 +2368,6 @@ fail:
        client_send_control(req, header, &reply);
 }
 
-static void control_enable_monitor(TALLOC_CTX *mem_ctx,
-                                  struct tevent_req *req,
-                                  struct ctdb_req_header *header,
-                                  struct ctdb_req_control *request)
-{
-       struct client_state *state = tevent_req_data(
-               req, struct client_state);
-       struct ctdbd_context *ctdb = state->ctdb;
-       struct ctdb_reply_control reply;
-
-       ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
-
-       reply.rdata.opcode = request->opcode;
-       reply.status = 0;
-       reply.errmsg = NULL;
-       client_send_control(req, header, &reply);
-}
-
-static void control_disable_monitor(TALLOC_CTX *mem_ctx,
-                                   struct tevent_req *req,
-                                   struct ctdb_req_header *header,
-                                   struct ctdb_req_control *request)
-{
-       struct client_state *state = tevent_req_data(
-               req, struct client_state);
-       struct ctdbd_context *ctdb = state->ctdb;
-       struct ctdb_reply_control reply;
-
-       ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
-
-       reply.rdata.opcode = request->opcode;
-       reply.status = 0;
-       reply.errmsg = NULL;
-       client_send_control(req, header, &reply);
-}
-
 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
                                      struct tevent_req *req,
                                      struct ctdb_req_header *header,
@@ -1839,7 +2379,7 @@ static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
        struct ctdb_reply_control reply;
        struct ctdb_node_map *nodemap;
        struct node_map *node_map = ctdb->node_map;
-       int i;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
@@ -1858,10 +2398,17 @@ static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
                }
 
                if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
+                       int ret;
+
                        node = &node_map->node[i];
 
                        node->flags |= NODE_FLAGS_DELETED;
-                       parse_ip("0.0.0.0", NULL, 0, &node->addr);
+                       ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
+                                                        false);
+                       if (ret != 0) {
+                               /* Can't happen, but Coverity... */
+                               goto fail;
+                       }
 
                        continue;
                }
@@ -1948,13 +2495,13 @@ static void control_release_ip(TALLOC_CTX *mem_ctx,
        struct ctdb_reply_control reply;
        struct ctdb_public_ip_list *ips = NULL;
        struct ctdb_public_ip *t = NULL;
-       int i;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
        if (ctdb->known_ips == NULL) {
                D_INFO("RELEASE_IP %s - not a public IP\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
                goto done;
        }
 
@@ -1969,14 +2516,15 @@ static void control_release_ip(TALLOC_CTX *mem_ctx,
        }
        if (t == NULL) {
                D_INFO("RELEASE_IP %s - not a public IP\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
                goto done;
        }
 
        if (t->pnn != header->destnode) {
                if (header->destnode == ip->pnn) {
                        D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
-                             ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+                             ctdb_sock_addr_to_string(mem_ctx,
+                                                      &ip->addr, false),
                              ip->pnn);
                        reply.status = -1;
                        reply.errmsg = "RELEASE_IP to TAKE_IP node";
@@ -1985,12 +2533,12 @@ static void control_release_ip(TALLOC_CTX *mem_ctx,
                }
 
                D_INFO("RELEASE_IP %s - to node %d - redundant\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
                       ip->pnn);
                t->pnn = ip->pnn;
        } else {
                D_NOTICE("RELEASE_IP %s - to node %d\n",
-                         ctdb_sock_addr_to_string(mem_ctx, &ip->addr),
+                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
                          ip->pnn);
                t->pnn = ip->pnn;
        }
@@ -2013,13 +2561,13 @@ static void control_takeover_ip(TALLOC_CTX *mem_ctx,
        struct ctdb_reply_control reply;
        struct ctdb_public_ip_list *ips = NULL;
        struct ctdb_public_ip *t = NULL;
-       int i;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
        if (ctdb->known_ips == NULL) {
                D_INFO("TAKEOVER_IP %s - not a public IP\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
                goto done;
        }
 
@@ -2034,16 +2582,16 @@ static void control_takeover_ip(TALLOC_CTX *mem_ctx,
        }
        if (t == NULL) {
                D_INFO("TAKEOVER_IP %s - not a public IP\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
                goto done;
        }
 
        if (t->pnn == header->destnode) {
                D_INFO("TAKEOVER_IP %s - redundant\n",
-                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                      ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
        } else {
                D_NOTICE("TAKEOVER_IP %s\n",
-                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr));
+                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
                t->pnn = ip->pnn;
        }
 
@@ -2084,7 +2632,9 @@ static void control_get_public_ips(TALLOC_CTX *mem_ctx,
                 * no available IPs.  Don't worry about interface
                 * states here - we're not faking down to that level.
                 */
-               if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
+               uint32_t flags = ctdb->node_map->node[header->destnode].flags;
+               if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
+                   ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
                        /* No available IPs: return dummy empty struct */
                        ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
                        if (ips == NULL) {
@@ -2115,7 +2665,7 @@ static void control_get_nodemap(TALLOC_CTX *mem_ctx,
        struct ctdb_reply_control reply;
        struct ctdb_node_map *nodemap;
        struct node *node;
-       int i;
+       unsigned int i;
 
        reply.rdata.opcode = request->opcode;
 
@@ -2196,7 +2746,6 @@ static void control_stop_node(TALLOC_CTX *mem_ctx,
        reply.rdata.opcode = request->opcode;
 
        DEBUG(DEBUG_INFO, ("Stopping node\n"));
-       ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
        ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
 
        reply.status = 0;
@@ -2298,6 +2847,49 @@ fail:
        reply.errmsg = "Failed to ban node";
 }
 
+static void control_trans3_commit(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Unknown database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       if (! (db->flags &
+              (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
+               reply.status = -1;
+               reply.errmsg = "Transactions on volatile database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       ret = ltdb_transaction(db, request->rdata.data.recbuf);
+       if (ret != 0) {
+               reply.status = -1;
+               reply.errmsg = "Transaction failed";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
                               struct tevent_req *req,
                               struct ctdb_req_header *header,
@@ -2308,6 +2900,7 @@ static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_reply_control reply;
        struct database *db;
+       int ret;
 
        reply.rdata.opcode = request->opcode;
 
@@ -2316,9 +2909,17 @@ static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
                reply.status = ENOENT;
                reply.errmsg = "Database not found";
        } else {
-               reply.rdata.data.seqnum = db->seq_num;
-               reply.status = 0;
-               reply.errmsg = NULL;
+               uint64_t seqnum;
+
+               ret = database_seqnum(db, &seqnum);
+               if (ret == 0) {
+                       reply.rdata.data.seqnum = seqnum;
+                       reply.status = 0;
+                       reply.errmsg = NULL;
+               } else {
+                       reply.status = ret;
+                       reply.errmsg = "Failed to get seqnum";
+               }
        }
 
        client_send_control(req, header, &reply);
@@ -2355,7 +2956,7 @@ static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
 {
        struct ctdb_iface_list *iface_list;
        struct interface *iface;
-       int i;
+       unsigned int i;
 
        iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
        if (iface_list == NULL) {
@@ -2433,7 +3034,7 @@ static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
 
        if (i == known->num) {
                D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
-                     ctdb_sock_addr_to_string(mem_ctx, addr));
+                     ctdb_sock_addr_to_string(mem_ctx, addr, false));
                reply.status = -1;
                reply.errmsg = "Unknown address";
                goto done;
@@ -2592,6 +3193,120 @@ done:
        client_send_control(req, header, &reply);
 }
 
+struct traverse_start_ext_state {
+       struct tevent_req *req;
+       struct ctdb_req_header *header;
+       uint32_t reqid;
+       uint64_t srvid;
+       bool withemptyrecords;
+       int status;
+};
+
+static int traverse_start_ext_handler(struct tdb_context *tdb,
+                                     TDB_DATA key, TDB_DATA data,
+                                     void *private_data)
+{
+       struct traverse_start_ext_state *state =
+               (struct traverse_start_ext_state *)private_data;
+       struct ctdb_rec_data rec;
+       struct ctdb_req_message_data message;
+       size_t np;
+
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return 0;
+       }
+
+       if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
+           (!state->withemptyrecords)) {
+               return 0;
+       }
+
+       rec = (struct ctdb_rec_data) {
+               .reqid = state->reqid,
+               .header = NULL,
+               .key = key,
+               .data = data,
+       };
+
+       message.srvid = state->srvid;
+       message.data.dsize = ctdb_rec_data_len(&rec);
+       message.data.dptr = talloc_size(state->req, message.data.dsize);
+       if (message.data.dptr == NULL) {
+               state->status = ENOMEM;
+               return 1;
+       }
+
+       ctdb_rec_data_push(&rec, message.data.dptr, &np);
+       client_send_message(state->req, state->header, &message);
+
+       talloc_free(message.data.dptr);
+
+       return 0;
+}
+
+static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
+                                      struct tevent_req *req,
+                                      struct ctdb_req_header *header,
+                                      struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+       struct ctdb_traverse_start_ext *ext;
+       struct traverse_start_ext_state t_state;
+       struct ctdb_rec_data rec;
+       struct ctdb_req_message_data message;
+       uint8_t buffer[32];
+       size_t np;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       ext = request->rdata.data.traverse_start_ext;
+
+       db = database_find(ctdb->db_map, ext->db_id);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Unknown database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       t_state = (struct traverse_start_ext_state) {
+               .req = req,
+               .header = header,
+               .reqid = ext->reqid,
+               .srvid = ext->srvid,
+               .withemptyrecords = ext->withemptyrecords,
+       };
+
+       ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
+       DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
+       if (t_state.status != 0) {
+               reply.status = -1;
+               reply.errmsg = "Memory error";
+               client_send_control(req, header, &reply);
+       }
+
+       reply.status = 0;
+       client_send_control(req, header, &reply);
+
+       rec = (struct ctdb_rec_data) {
+               .reqid = ext->reqid,
+               .header = NULL,
+               .key = tdb_null,
+               .data = tdb_null,
+       };
+
+       message.srvid = ext->srvid;
+       message.data.dsize = ctdb_rec_data_len(&rec);
+       ctdb_rec_data_push(&rec, buffer, &np);
+       message.data.dptr = buffer;
+       client_send_message(req, header, &message);
+}
+
 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
                                    struct tevent_req *req,
                                    struct ctdb_req_header *header,
@@ -2626,6 +3341,21 @@ done:
        client_send_control(req, header, &reply);
 }
 
+static void control_ipreallocated(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct ctdb_reply_control reply;
+
+       /* Always succeed */
+       reply.rdata.opcode = request->opcode;
+       reply.status = 0;
+       reply.errmsg = NULL;
+
+       client_send_control(req, header, &reply);
+}
+
 static void control_get_runstate(TALLOC_CTX *mem_ctx,
                                 struct tevent_req *req,
                                 struct ctdb_req_header *header,
@@ -2671,6 +3401,156 @@ fail:
        client_send_control(req, header, &reply);
 }
 
+static void control_db_open_flags(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.db_id);
+       if (db == NULL) {
+               reply.status = ENOENT;
+               reply.errmsg = "Database not found";
+       } else {
+               reply.rdata.data.tdb_flags = database_flags(db->flags);
+               reply.status = 0;
+               reply.errmsg = NULL;
+       }
+
+       client_send_control(req, header, &reply);
+}
+
+static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
+                                        struct tevent_req *req,
+                                        struct ctdb_req_header *header,
+                                        struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+
+       reply.rdata.opcode = request->opcode;
+
+       for (db = ctdb->db_map->db; db != NULL; db = db->next) {
+               if (strcmp(db->name, request->rdata.data.db_name) == 0) {
+                       goto done;
+               }
+       }
+
+       db = database_new(ctdb->db_map, request->rdata.data.db_name,
+                         CTDB_DB_FLAGS_REPLICATED);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Failed to attach database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+done:
+       reply.rdata.data.db_id = db->id;
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
+static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
+                                   struct tevent_req *req,
+                                   struct ctdb_req_header *header,
+                                   struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_client *client;
+       struct client_state *cstate;
+       struct ctdb_reply_control reply;
+       bool pid_found, srvid_found;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       pid_found = false;
+       srvid_found = false;
+
+       for (client=ctdb->client_list; client != NULL; client=client->next) {
+               if (client->pid == request->rdata.data.pid_srvid->pid) {
+                       pid_found = true;
+                       cstate = (struct client_state *)client->state;
+                       ret = srvid_exists(ctdb->srv,
+                                          request->rdata.data.pid_srvid->srvid,
+                                          cstate);
+                       if (ret == 0) {
+                               srvid_found = true;
+                               ret = kill(cstate->pid, 0);
+                               if (ret != 0) {
+                                       reply.status = ret;
+                                       reply.errmsg = strerror(errno);
+                               } else {
+                                       reply.status = 0;
+                                       reply.errmsg = NULL;
+                               }
+                       }
+               }
+       }
+
+       if (! pid_found) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID";
+       } else if (! srvid_found) {
+               reply.status = -1;
+               reply.errmsg = "No client for PID and SRVID";
+       }
+
+       client_send_control(req, header, &reply);
+}
+
+static bool fake_control_failure(TALLOC_CTX *mem_ctx,
+                                struct tevent_req *req,
+                                struct ctdb_req_header *header,
+                                struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct fake_control_failure *f = NULL;
+
+       D_DEBUG("Checking fake control failure for control %u on node %u\n",
+               request->opcode, header->destnode);
+       for (f = ctdb->control_failures; f != NULL; f = f->next) {
+               if (f->opcode == request->opcode &&
+                   (f->pnn == header->destnode ||
+                    f->pnn == CTDB_UNKNOWN_PNN)) {
+
+                       reply.rdata.opcode = request->opcode;
+                       if (strcmp(f->error, "TIMEOUT") == 0) {
+                               /* Causes no reply */
+                               D_ERR("Control %u fake timeout on node %u\n",
+                                     request->opcode, header->destnode);
+                               return true;
+                       } else if (strcmp(f->error, "ERROR") == 0) {
+                               D_ERR("Control %u fake error on node %u\n",
+                                     request->opcode, header->destnode);
+                               reply.status = -1;
+                               reply.errmsg = f->comment;
+                               client_send_control(req, header, &reply);
+                               return true;
+                       }
+               }
+       }
+
+       return false;
+}
+
 static void control_error(TALLOC_CTX *mem_ctx,
                          struct tevent_req *req,
                          struct ctdb_req_header *header,
@@ -2678,6 +3558,8 @@ static void control_error(TALLOC_CTX *mem_ctx,
 {
        struct ctdb_reply_control reply;
 
+       D_DEBUG("Control %u not implemented\n", request->opcode);
+
        reply.rdata.opcode = request->opcode;
        reply.status = -1;
        reply.errmsg = "Not implemented";
@@ -2808,6 +3690,8 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
 static void client_dead_handler(void *private_data);
 static void client_process_packet(struct tevent_req *req,
                                  uint8_t *buf, size_t buflen);
+static void client_process_call(struct tevent_req *req,
+                               uint8_t *buf, size_t buflen);
 static void client_process_message(struct tevent_req *req,
                                   uint8_t *buf, size_t buflen);
 static void client_process_control(struct tevent_req *req,
@@ -2833,6 +3717,8 @@ static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
        state->ctdb = ctdb;
        state->pnn = pnn;
 
+       (void) ctdb_get_peer_pid(fd, &state->pid);
+
        ret = comm_setup(state, ev, fd, client_read_handler, req,
                         client_dead_handler, req, &state->comm);
        if (ret != 0) {
@@ -2840,6 +3726,12 @@ static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
                return tevent_req_post(req, ev);
        }
 
+       ret = client_add(ctdb, state->pid, state);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return tevent_req_post(req, ev);
+       }
+
        DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
 
        return req;
@@ -2854,9 +3746,11 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                req, struct client_state);
        struct ctdbd_context *ctdb = state->ctdb;
        struct ctdb_req_header header;
-       int ret, i;
+       size_t np;
+       unsigned int i;
+       int ret;
 
-       ret = ctdb_req_header_pull(buf, buflen, &header);
+       ret = ctdb_req_header_pull(buf, buflen, &header, &np);
        if (ret != 0) {
                return;
        }
@@ -2876,7 +3770,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                for (i=0; i<ctdb->node_map->num_nodes; i++) {
                        header.destnode = i;
 
-                       ctdb_req_header_push(&header, buf);
+                       ctdb_req_header_push(&header, buf, &np);
                        client_process_packet(req, buf, buflen);
                }
                return;
@@ -2891,7 +3785,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
 
                        header.destnode = i;
 
-                       ctdb_req_header_push(&header, buf);
+                       ctdb_req_header_push(&header, buf, &np);
                        client_process_packet(req, buf, buflen);
                }
                return;
@@ -2910,7 +3804,7 @@ static void client_read_handler(uint8_t *buf, size_t buflen,
                return;
        }
 
-       ctdb_req_header_push(&header, buf);
+       ctdb_req_header_push(&header, buf, &np);
        client_process_packet(req, buf, buflen);
 }
 
@@ -2926,14 +3820,19 @@ static void client_process_packet(struct tevent_req *req,
                                  uint8_t *buf, size_t buflen)
 {
        struct ctdb_req_header header;
+       size_t np;
        int ret;
 
-       ret = ctdb_req_header_pull(buf, buflen, &header);
+       ret = ctdb_req_header_pull(buf, buflen, &header, &np);
        if (ret != 0) {
                return;
        }
 
        switch (header.operation) {
+       case CTDB_REQ_CALL:
+               client_process_call(req, buf, buflen);
+               break;
+
        case CTDB_REQ_MESSAGE:
                client_process_message(req, buf, buflen);
                break;
@@ -2947,6 +3846,77 @@ static void client_process_packet(struct tevent_req *req,
        }
 }
 
+static void client_process_call(struct tevent_req *req,
+                               uint8_t *buf, size_t buflen)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       TALLOC_CTX *mem_ctx;
+       struct ctdb_req_header header;
+       struct ctdb_req_call request;
+       struct ctdb_reply_call reply;
+       struct database *db;
+       struct ctdb_ltdb_header hdr;
+       TDB_DATA data;
+       int ret;
+
+       mem_ctx = talloc_new(state);
+       if (tevent_req_nomem(mem_ctx, req)) {
+               return;
+       }
+
+       ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
+       if (ret != 0) {
+               talloc_free(mem_ctx);
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       header_fix_pnn(&header, ctdb);
+
+       if (header.destnode >= ctdb->node_map->num_nodes) {
+               goto fail;
+       }
+
+       DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
+
+       db = database_find(ctdb->db_map, request.db_id);
+       if (db == NULL) {
+               goto fail;
+       }
+
+       ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
+       if (ret != 0) {
+               goto fail;
+       }
+
+       /* Fake migration */
+       if (hdr.dmaster != ctdb->node_map->pnn) {
+               hdr.dmaster = ctdb->node_map->pnn;
+
+               ret = ltdb_store(db, request.key, &hdr, data);
+               if (ret != 0) {
+                       goto fail;
+               }
+       }
+
+       talloc_free(mem_ctx);
+
+       reply.status = 0;
+       reply.data = tdb_null;
+
+       client_send_call(req, &header, &reply);
+       return;
+
+fail:
+       talloc_free(mem_ctx);
+       reply.status = -1;
+       reply.data = tdb_null;
+
+       client_send_call(req, &header, &reply);
+}
+
 static void client_process_message(struct tevent_req *req,
                                   uint8_t *buf, size_t buflen)
 {
@@ -2988,6 +3958,8 @@ static void client_process_message(struct tevent_req *req,
                message_disable_recoveries(mem_ctx, req, &header, &request);
        } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
                message_takeover_run(mem_ctx, req, &header, &request);
+       } else {
+               D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
        }
 
        /* check srvid */
@@ -3032,6 +4004,10 @@ static void client_process_control(struct tevent_req *req,
        DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
                           request.opcode, header.reqid));
 
+       if (fake_control_failure(mem_ctx, req, &header, &request)) {
+               goto done;
+       }
+
        switch (request.opcode) {
        case CTDB_CONTROL_PROCESS_EXISTS:
                control_process_exists(mem_ctx, req, &header, &request);
@@ -3069,6 +4045,10 @@ static void client_process_control(struct tevent_req *req,
                control_set_recmode(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_DB_ATTACH:
+               control_db_attach(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_REGISTER_SRVID:
                control_register_srvid(mem_ctx, req, &header, &request);
                break;
@@ -3097,10 +4077,6 @@ static void client_process_control(struct tevent_req *req,
                control_shutdown(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_GET_MONMODE:
-               control_get_monmode(mem_ctx, req, &header, &request);
-               break;
-
        case CTDB_CONTROL_SET_TUNABLE:
                control_set_tunable(mem_ctx, req, &header, &request);
                break;
@@ -3121,16 +4097,12 @@ static void client_process_control(struct tevent_req *req,
                control_get_all_tunables(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_UPTIME:
-               control_uptime(mem_ctx, req, &header, &request);
-               break;
-
-       case CTDB_CONTROL_ENABLE_MONITOR:
-               control_enable_monitor(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+               control_db_attach_persistent(mem_ctx, req, &header, &request);
                break;
 
-       case CTDB_CONTROL_DISABLE_MONITOR:
-               control_disable_monitor(mem_ctx, req, &header, &request);
+       case CTDB_CONTROL_UPTIME:
+               control_uptime(mem_ctx, req, &header, &request);
                break;
 
        case CTDB_CONTROL_RELOAD_NODES_FILE:
@@ -3173,6 +4145,10 @@ static void client_process_control(struct tevent_req *req,
                control_set_ban_state(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_TRANS3_COMMIT:
+               control_trans3_commit(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_DB_SEQNUM:
                control_get_db_seqnum(mem_ctx, req, &header, &request);
                break;
@@ -3197,10 +4173,18 @@ static void client_process_control(struct tevent_req *req,
                control_set_db_readonly(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_TRAVERSE_START_EXT:
+               control_traverse_start_ext(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_SET_DB_STICKY:
                control_set_db_sticky(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_IPREALLOCATED:
+               control_ipreallocated(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_RUNSTATE:
                control_get_runstate(mem_ctx, req, &header, &request);
                break;
@@ -3209,6 +4193,18 @@ static void client_process_control(struct tevent_req *req,
                control_get_nodes_file(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_DB_OPEN_FLAGS:
+               control_db_open_flags(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_DB_ATTACH_REPLICATED:
+               control_db_attach_replicated(mem_ctx, req, &header, &request);
+               break;
+
+       case CTDB_CONTROL_CHECK_PID_SRVID:
+               control_check_pid_srvid(mem_ctx, req, &header, &request);
+               break;
+
        default:
                if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
                        control_error(mem_ctx, req, &header, &request);
@@ -3216,6 +4212,7 @@ static void client_process_control(struct tevent_req *req,
                break;
        }
 
+done:
        talloc_free(mem_ctx);
 }
 
@@ -3398,18 +4395,23 @@ fail:
 }
 
 static struct options {
+       const char *dbdir;
        const char *sockpath;
        const char *pidfile;
        const char *debuglevel;
 } options;
 
 static struct poptOption cmdline_options[] = {
+       POPT_AUTOHELP
+       { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
+               "Database directory", "directory" },
        { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
                "Unix domain socket path", "filename" },
        { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
                "pid file", "filename" } ,
        { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
                "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
+       POPT_TABLEEND
 };
 
 static void cleanup(void)
@@ -3473,6 +4475,12 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       if (options.dbdir == NULL) {
+               fprintf(stderr, "Please specify database directory\n");
+               poptPrintHelp(pc, stdout, 0);
+               exit(1);
+       }
+
        if (options.sockpath == NULL) {
                fprintf(stderr, "Please specify socket path\n");
                poptPrintHelp(pc, stdout, 0);
@@ -3498,7 +4506,7 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
-       ctdb = ctdbd_setup(mem_ctx);
+       ctdb = ctdbd_setup(mem_ctx, options.dbdir);
        if (ctdb == NULL) {
                exit(1);
        }