ctdb-daemon: Move ctdb_init() to the only place it is used
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_traverse.c
index d44c27401a7495aaaa2f1d00622732ba277a3b52..04a41138a72bc8bdaacfcaf9626ecc43fc7fccd4 100644 (file)
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "lib/events/events.h"
+#include "replace.h"
 #include "system/filesys.h"
+#include "system/network.h"
 #include "system/wait.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdb.h"
-#include "../include/ctdb_private.h"
+#include "system/time.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/util_process.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
 
 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
 
@@ -32,40 +48,52 @@ typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA da
   terminate the traverse
  */
 struct ctdb_traverse_local_handle {
+       struct ctdb_traverse_local_handle *next, *prev;
        struct ctdb_db_context *ctdb_db;
        int fd[2];
        pid_t child;
+       uint64_t srvid;
+       uint32_t client_reqid;
+       uint32_t reqid;
+       int srcnode;
        void *private_data;
        ctdb_traverse_fn_t callback;
-       struct timeval start_time;
-       struct ctdb_queue *queue;
+       bool withemptyrecords;
+       struct tevent_fd *fde;
+       int records_failed;
+       int records_sent;
 };
 
 /*
-  called when data is available from the child
+ * called when traverse is completed by child or on error
  */
-static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
+static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
+                                       uint16_t flags, void *private_data)
 {
-       struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
-                                                              struct ctdb_traverse_local_handle);
-       TDB_DATA key, data;
+       struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
+                                                       struct ctdb_traverse_local_handle);
        ctdb_traverse_fn_t callback = h->callback;
        void *p = h->private_data;
-       struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
-
-       if (rawdata == NULL || length < 4 || length != tdata->length) {
-               /* end of traverse */
-               talloc_free(h);
-               callback(p, tdb_null, tdb_null);
-               return;
+       int res;
+       ssize_t n;
+
+       /* Read the number of records sent by traverse child */
+       n = sys_read(h->fd[0], &res, sizeof(res));
+       if (n < 0 || n != sizeof(res)) {
+               /* Traverse child failed */
+               DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
+                                 h->ctdb_db->db_name, h->reqid));
+       } else if (res < 0) {
+               /* Traverse failed */
+               res = -res;
+               DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
+                                 h->ctdb_db->db_name, h->reqid, res));
+       } else {
+               DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
+                                  h->ctdb_db->db_name, h->reqid, res));
        }
 
-       key.dsize = tdata->keylen;
-       key.dptr  = &tdata->data[0];
-       data.dsize = tdata->datalen;
-       data.dptr = &tdata->data[tdata->keylen];
-
-       callback(p, key, data); 
+       callback(p, tdb_null, tdb_null);
 }
 
 /*
@@ -73,8 +101,8 @@ static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *p
  */
 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
 {
-       kill(h->child, SIGKILL);
-       waitpid(h->child, NULL, 0);
+       DLIST_REMOVE(h->ctdb_db->traverse, h);
+       ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
        return 0;
 }
 
@@ -83,46 +111,75 @@ static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
  */
 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
 {
-       struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
+       struct ctdb_traverse_local_handle *h = talloc_get_type(p,
                                                               struct ctdb_traverse_local_handle);
-       struct ctdb_rec_data *d;
+       struct ctdb_rec_data_old *d;
        struct ctdb_ltdb_header *hdr;
+       int res, status;
+       TDB_DATA outdata;
 
-       /* filter out non-authoritative and zero-length records */
        hdr = (struct ctdb_ltdb_header *)data.dptr;
-       if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
-           hdr->dmaster != h->ctdb_db->ctdb->vnn) {
-               return 0;
+
+       if (ctdb_db_volatile(h->ctdb_db)) {
+               /* filter out zero-length records */
+               if (!h->withemptyrecords &&
+                   data.dsize <= sizeof(struct ctdb_ltdb_header))
+               {
+                       return 0;
+               }
+
+               /* filter out non-authoritative records */
+               if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
+                       return 0;
+               }
        }
 
-       d = ctdb_marshall_record(h, 0, key, data);
+       d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
        if (d == NULL) {
                /* error handling is tricky in this child code .... */
+               h->records_failed++;
                return -1;
        }
 
-       if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
+       outdata.dptr = (uint8_t *)d;
+       outdata.dsize = d->length;
+
+       res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
+                          CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
+       if (res != 0 || status != 0) {
+               h->records_failed++;
                return -1;
        }
+
+       h->records_sent++;
        return 0;
 }
 
+struct traverse_all_state {
+       struct ctdb_context *ctdb;
+       struct ctdb_traverse_local_handle *h;
+       uint32_t reqid;
+       uint32_t srcnode;
+       uint32_t client_reqid;
+       uint64_t srvid;
+       bool withemptyrecords;
+};
 
 /*
   setup a non-blocking traverse of a local ltdb. The callback function
   will be called on every record in the local ltdb. To stop the
-  travserse, talloc_free() the travserse_handle.
+  traverse, talloc_free() the traverse_handle.
 
   The traverse is finished when the callback is called with tdb_null for key and data
  */
 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
                                                              ctdb_traverse_fn_t callback,
-                                                             void *private_data)
+                                                             struct traverse_all_state *all_state)
 {
        struct ctdb_traverse_local_handle *h;
        int ret;
 
-       h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
+       h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
        if (h == NULL) {
                return NULL;
        }
@@ -134,7 +191,7 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
                return NULL;
        }
 
-       h->child = fork();
+       h->child = ctdb_fork(ctdb_db->ctdb);
 
        if (h->child == (pid_t)-1) {
                close(h->fd[0]);
@@ -144,30 +201,84 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
        }
 
        h->callback = callback;
-       h->private_data = private_data;
+       h->private_data = all_state;
        h->ctdb_db = ctdb_db;
+       h->client_reqid = all_state->client_reqid;
+       h->reqid = all_state->reqid;
+       h->srvid = all_state->srvid;
+       h->srcnode = all_state->srcnode;
+       h->withemptyrecords = all_state->withemptyrecords;
 
        if (h->child == 0) {
                /* start the traverse in the child */
+               int res, status;
+               pid_t parent = getpid();
+               struct ctdb_context *ctdb = ctdb_db->ctdb;
+               struct ctdb_rec_data_old *d;
+               TDB_DATA outdata;
+
                close(h->fd[0]);
-               tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
+
+               prctl_set_comment("ctdb_traverse");
+               if (switch_from_server_to_client(ctdb) != 0) {
+                       DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
+                       _exit(0);
+               }
+
+               d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
+               if (d == NULL) {
+                       res = 0;
+                       sys_write(h->fd[1], &res, sizeof(int));
+                       _exit(0);
+               }
+
+               res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
+               if (res == -1 || h->records_failed > 0) {
+                       /* traverse failed */
+                       res = -(h->records_sent);
+               } else {
+                       res = h->records_sent;
+               }
+
+               /* Wait till all the data is flushed from output queue */
+               while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
+                       tevent_loop_once(ctdb->ev);
+               }
+
+               /* End traverse by sending empty record */
+               outdata.dptr = (uint8_t *)d;
+               outdata.dsize = d->length;
+               ret = ctdb_control(ctdb, h->srcnode, 0,
+                                  CTDB_CONTROL_TRAVERSE_DATA,
+                                  CTDB_CTRL_FLAG_NOREPLY, outdata,
+                                  NULL, NULL, &status, NULL, NULL);
+               if (ret == -1 || status == -1) {
+                       if (res > 0) {
+                               res = -res;
+                       }
+               }
+
+               sys_write(h->fd[1], &res, sizeof(res));
+
+               ctdb_wait_for_process_to_exit(parent);
                _exit(0);
        }
 
        close(h->fd[1]);
+       set_close_on_exec(h->fd[0]);
+
        talloc_set_destructor(h, traverse_local_destructor);
 
-       /*
-         setup a packet queue between the child and the parent. This
-         copes with all the async and packet boundary issues
-        */
-       h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
-       if (h->queue == NULL) {
+       DLIST_ADD(ctdb_db->traverse, h);
+
+       h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
+                              ctdb_traverse_child_handler, h);
+       if (h->fde == NULL) {
+               close(h->fd[0]);
                talloc_free(h);
                return NULL;
        }
-
-       h->start_time = timeval_current();
+       tevent_fd_set_auto_close(h->fde);
 
        return h;
 }
@@ -175,10 +286,12 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
 
 struct ctdb_traverse_all_handle {
        struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
        uint32_t reqid;
        ctdb_traverse_fn_t callback;
        void *private_data;
        uint32_t null_count;
+       bool timedout;
 };
 
 /*
@@ -186,120 +299,206 @@ struct ctdb_traverse_all_handle {
  */
 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
 {
-       ctdb_reqid_remove(state->ctdb, state->reqid);
+       reqid_remove(state->ctdb->idr, state->reqid);
        return 0;
 }
 
-struct ctdb_traverse_all {
-       uint32_t db_id;
-       uint32_t reqid;
-       uint32_t vnn;
-};
-
 /* called when a traverse times out */
-static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
+static void ctdb_traverse_all_timeout(struct tevent_context *ev,
+                                     struct tevent_timer *te,
                                      struct timeval t, void *private_data)
 {
        struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
 
-       state->ctdb->statistics.timeouts.traverse++;
+       DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
+       CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
 
+       state->timedout = true;
        state->callback(state->private_data, tdb_null, tdb_null);
-       talloc_free(state);
 }
 
+
+struct traverse_start_state {
+       struct ctdb_context *ctdb;
+       struct ctdb_traverse_all_handle *h;
+       uint32_t srcnode;
+       uint32_t reqid;
+       uint32_t db_id;
+       uint64_t srvid;
+       bool withemptyrecords;
+       int num_records;
+};
+
+
 /*
   setup a cluster-wide non-blocking traverse of a ctdb. The
   callback function will be called on every record in the local
-  ltdb. To stop the travserse, talloc_free() the traverse_handle.
+  ltdb. To stop the traverse, talloc_free() the traverse_handle.
 
   The traverse is finished when the callback is called with tdb_null
   for key and data
  */
 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
                                                                 ctdb_traverse_fn_t callback,
-                                                                void *private_data)
+                                                                struct traverse_start_state *start_state)
 {
        struct ctdb_traverse_all_handle *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int ret;
        TDB_DATA data;
        struct ctdb_traverse_all r;
+       struct ctdb_traverse_all_ext r_ext;
+       uint32_t destination;
 
-       state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
+       state = talloc(start_state, struct ctdb_traverse_all_handle);
        if (state == NULL) {
                return NULL;
        }
 
-       state->ctdb = ctdb;
-       state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
-       state->callback = callback;
-       state->private_data = private_data;
-       state->null_count = 0;
+       state->ctdb         = ctdb;
+       state->ctdb_db      = ctdb_db;
+       state->reqid        = reqid_new(ctdb_db->ctdb->idr, state);
+       state->callback     = callback;
+       state->private_data = start_state;
+       state->null_count   = 0;
+       state->timedout     = false;
        
        talloc_set_destructor(state, ctdb_traverse_all_destructor);
 
-       r.db_id = ctdb_db->db_id;
-       r.reqid = state->reqid;
-       r.vnn   = ctdb->vnn;
+       if (start_state->withemptyrecords) {
+               r_ext.db_id = ctdb_db->db_id;
+               r_ext.reqid = state->reqid;
+               r_ext.pnn   = ctdb->pnn;
+               r_ext.client_reqid = start_state->reqid;
+               r_ext.srvid = start_state->srvid;
+               r_ext.withemptyrecords = start_state->withemptyrecords;
+
+               data.dptr = (uint8_t *)&r_ext;
+               data.dsize = sizeof(r_ext);
+       } else {
+               r.db_id = ctdb_db->db_id;
+               r.reqid = state->reqid;
+               r.pnn   = ctdb->pnn;
+               r.client_reqid = start_state->reqid;
+               r.srvid = start_state->srvid;
+
+               data.dptr = (uint8_t *)&r;
+               data.dsize = sizeof(r);
+       }
 
-       data.dptr = (uint8_t *)&r;
-       data.dsize = sizeof(r);
+       if (ctdb_db_volatile(ctdb_db)) {
+               /* normal database, traverse all nodes */         
+               destination = CTDB_BROADCAST_VNNMAP;
+       } else {
+               int i;
+               /* persistent database, traverse one node, preferably
+                * the local one
+                */
+               destination = ctdb->pnn;
+               /* check we are in the vnnmap */
+               for (i=0; i < ctdb->vnn_map->size; i++) {
+                       if (ctdb->vnn_map->map[i] == ctdb->pnn) {
+                               break;
+                       }
+               }
+               /* if we are not in the vnn map we just pick the first
+                * node instead
+                */
+               if (i == ctdb->vnn_map->size) {
+                       destination = ctdb->vnn_map->map[0];
+               }
+       }
 
-       /* tell all the nodes in the cluster to start sending records to this node */
-       ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, 
+       /* tell all the nodes in the cluster to start sending records to this
+        * node, or if it is a persistent database, just tell the local
+        * node
+        */
+
+       if (start_state->withemptyrecords) {
+               ret = ctdb_daemon_send_control(ctdb, destination, 0,
+                                      CTDB_CONTROL_TRAVERSE_ALL_EXT,
+                                      0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+       } else {
+               ret = ctdb_daemon_send_control(ctdb, destination, 0,
                                       CTDB_CONTROL_TRAVERSE_ALL,
                                       0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+       }
+
        if (ret != 0) {
                talloc_free(state);
                return NULL;
        }
 
+       DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
+                           ctdb_db->db_name, state->reqid));
+
        /* timeout the traverse */
-       event_add_timed(ctdb->ev, state, 
-                       timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
-                       ctdb_traverse_all_timeout, state);
+       tevent_add_timer(ctdb->ev, state,
+                        timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
+                        ctdb_traverse_all_timeout, state);
 
        return state;
 }
 
-struct traverse_all_state {
-       struct ctdb_context *ctdb;
-       struct ctdb_traverse_local_handle *h;
-       uint32_t reqid;
-       uint32_t srcnode;
-};
-
 /*
-  called for each record during a traverse all 
+  called when local traverse ends
  */
 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
 {
        struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
-       int ret;
-       struct ctdb_rec_data *d;
-       TDB_DATA cdata;
 
-       d = ctdb_marshall_record(state, state->reqid, key, data);
-       if (d == NULL) {
-               /* darn .... */
-               DEBUG(0,("Out of memory in traverse_all_callback\n"));
-               return;
+       /* we're done */
+       talloc_free(state);
+}
+
+/*
+ * extended version to take the "withemptyrecords" parameter"
+ */
+int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+       struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
+       struct traverse_all_state *state;
+       struct ctdb_db_context *ctdb_db;
+
+       if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
+               DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
+               return -1;
        }
 
-       cdata.dptr = (uint8_t *)d;
-       cdata.dsize = d->length;
+       ctdb_db = find_ctdb_db(ctdb, c->db_id);
+       if (ctdb_db == NULL) {
+               return -1;
+       }
 
-       ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
-                                      0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
-       if (ret != 0) {
-               DEBUG(0,("Failed to send traverse data\n"));
+       if (ctdb_db->unhealthy_reason) {
+               if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+                       DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+                                       ctdb_db->db_name, ctdb_db->unhealthy_reason));
+                       return -1;
+               }
+               DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
        }
 
-       if (key.dsize == 0 && data.dsize == 0) {
-               /* we're done */
+       state = talloc(ctdb_db, struct traverse_all_state);
+       if (state == NULL) {
+               return -1;
+       }
+
+       state->reqid = c->reqid;
+       state->srcnode = c->pnn;
+       state->ctdb = ctdb;
+       state->client_reqid = c->client_reqid;
+       state->srvid = c->srvid;
+       state->withemptyrecords = c->withemptyrecords;
+
+       state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
+       if (state->h == NULL) {
                talloc_free(state);
+               return -1;
        }
+
+       return 0;
 }
 
 /*
@@ -314,7 +513,7 @@ int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_
        struct ctdb_db_context *ctdb_db;
 
        if (data.dsize != sizeof(struct ctdb_traverse_all)) {
-               DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
                return -1;
        }
 
@@ -323,14 +522,27 @@ int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_
                return -1;
        }
 
+       if (ctdb_db->unhealthy_reason) {
+               if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+                       DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+                                       ctdb_db->db_name, ctdb_db->unhealthy_reason));
+                       return -1;
+               }
+               DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
+                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
+       }
+
        state = talloc(ctdb_db, struct traverse_all_state);
        if (state == NULL) {
                return -1;
        }
 
        state->reqid = c->reqid;
-       state->srcnode = c->vnn;
+       state->srcnode = c->pnn;
        state->ctdb = ctdb;
+       state->client_reqid = c->client_reqid;
+       state->srvid = c->srvid;
+       state->withemptyrecords = false;
 
        state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
        if (state->h == NULL) {
@@ -348,18 +560,18 @@ int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_
  */
 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
 {
-       struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+       struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
        struct ctdb_traverse_all_handle *state;
        TDB_DATA key;
        ctdb_traverse_fn_t callback;
        void *private_data;
 
        if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
-               DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
+               DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
                return -1;
        }
 
-       state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
+       state = reqid_find(ctdb->idr, d->reqid, struct ctdb_traverse_all_handle);
        if (state == NULL || d->reqid != state->reqid) {
                /* traverse might have been terminated already */
                return -1;
@@ -372,8 +584,13 @@ int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB
 
        if (key.dsize == 0 && data.dsize == 0) {
                state->null_count++;
-               if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
-                       return 0;
+               /* Persistent databases are only scanned on one node (the local
+                * node)
+                */
+               if (ctdb_db_volatile(state->ctdb_db)) {
+                       if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
+                               return 0;
+                       }
                }
        }
 
@@ -381,21 +598,59 @@ int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB
        private_data = state->private_data;
 
        callback(private_data, key, data);
-       if (key.dsize == 0 && data.dsize == 0) {
-               /* we've received all of the null replies, so all
-                  nodes are finished */
-               talloc_free(state);
-       }
        return 0;
 }      
 
-struct traverse_start_state {
-       struct ctdb_context *ctdb;
-       struct ctdb_traverse_all_handle *h;
-       uint32_t srcnode;
-       uint32_t reqid;
-       uint64_t srvid;
-};
+/*
+  kill a in-progress traverse, used when a client disconnects
+ */
+int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data, 
+                                  TDB_DATA *outdata, uint32_t srcnode)
+{
+       struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_traverse_local_handle *t;
+
+       ctdb_db = find_ctdb_db(ctdb, d->db_id);
+       if (ctdb_db == NULL) {
+               return -1;
+       }
+
+       for (t=ctdb_db->traverse; t; t=t->next) {
+               if (t->client_reqid == d->reqid &&
+                   t->srvid == d->srvid) {
+                       talloc_free(t);
+                       break;
+               }
+       }
+
+       return 0;
+}
+
+
+/*
+  this is called when a client disconnects during a traverse
+  we need to notify all the nodes taking part in the search that they
+  should kill their traverse children
+ */
+static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
+{
+       struct ctdb_traverse_start r;
+       TDB_DATA data;
+
+       DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
+       r.db_id = state->db_id;
+       r.reqid = state->reqid;
+       r.srvid = state->srvid;
+
+       data.dptr = (uint8_t *)&r;
+       data.dsize = sizeof(r);
+
+       ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
+                                CTDB_CONTROL_TRAVERSE_KILL, 
+                                0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+       return 0;
+}
 
 /*
   callback which sends records as messages to the client
@@ -403,12 +658,12 @@ struct traverse_start_state {
 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
 {
        struct traverse_start_state *state;
-       struct ctdb_rec_data *d;
+       struct ctdb_rec_data_old *d;
        TDB_DATA cdata;
 
        state = talloc_get_type(p, struct traverse_start_state);
 
-       d = ctdb_marshall_record(state, state->reqid, key, data);
+       d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
        if (d == NULL) {
                return;
        }
@@ -416,25 +671,48 @@ static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
        cdata.dptr = (uint8_t *)d;
        cdata.dsize = d->length;
 
-       ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
+       srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
        if (key.dsize == 0 && data.dsize == 0) {
-               /* end of traverse */
-               talloc_free(state);
+               DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
+                                    state->h->ctdb_db->db_name, state->h->reqid,
+                                    state->num_records));
+
+               if (state->h->timedout) {
+                       /* timed out, send TRAVERSE_KILL control */
+                       talloc_free(state);
+               } else {
+                       /* end of traverse */
+                       talloc_set_destructor(state, NULL);
+                       talloc_free(state);
+               }
+       } else {
+               state->num_records++;
        }
 }
 
-/*
-  start a traverse_all - called as a control from a client
+
+/**
+ * start a traverse_all - called as a control from a client.
+ * extended version to take the "withemptyrecords" parameter.
  */
-int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
-                                   TDB_DATA *outdata, uint32_t srcnode)
+int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
+                                       TDB_DATA data,
+                                       TDB_DATA *outdata,
+                                       uint32_t srcnode,
+                                       uint32_t client_id)
 {
-       struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+       struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
        struct traverse_start_state *state;
        struct ctdb_db_context *ctdb_db;
+       struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
+
+       if (client == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
+               return -1;              
+       }
 
        if (data.dsize != sizeof(*d)) {
-               DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
+               DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
                return -1;
        }
 
@@ -443,7 +721,17 @@ int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
                return -1;
        }
 
-       state = talloc(ctdb_db, struct traverse_start_state);
+       if (ctdb_db->unhealthy_reason) {
+               if (ctdb->tunable.allow_unhealthy_db_read == 0) {
+                       DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+                                       ctdb_db->db_name, ctdb_db->unhealthy_reason));
+                       return -1;
+               }
+               DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
+                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
+       }
+
+       state = talloc(client, struct traverse_start_state);
        if (state == NULL) {
                return -1;
        }
@@ -451,7 +739,10 @@ int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
        state->srcnode = srcnode;
        state->reqid = d->reqid;
        state->srvid = d->srvid;
+       state->db_id = d->db_id;
        state->ctdb = ctdb;
+       state->withemptyrecords = d->withemptyrecords;
+       state->num_records = 0;
 
        state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
        if (state->h == NULL) {
@@ -459,5 +750,32 @@ int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
                return -1;
        }
 
+       talloc_set_destructor(state, ctdb_traverse_start_destructor);
+
        return 0;
 }
+
+/**
+ * start a traverse_all - called as a control from a client.
+ */
+int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
+                                   TDB_DATA data,
+                                   TDB_DATA *outdata,
+                                   uint32_t srcnode,
+                                   uint32_t client_id)
+{
+       struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+       struct ctdb_traverse_start_ext d2;
+       TDB_DATA data2;
+
+       ZERO_STRUCT(d2);
+       d2.db_id = d->db_id;
+       d2.reqid = d->reqid;
+       d2.srvid = d->srvid;
+       d2.withemptyrecords = false;
+
+       data2.dsize = sizeof(d2);
+       data2.dptr = (uint8_t *)&d2;
+
+       return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);
+}