ctdb-daemon: Do not force full vacuum on first vacuuming run
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_vacuum.c
index 56211eb605a511fde1c64df1063da7e7889d026c..9d086917f3c47322cbd5e51c1771880a6cc63f50 100644 (file)
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "tdb.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "system/dir.h"
-#include "../include/ctdb_private.h"
-#include "db_wrap.h"
+#include "system/time.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "lib/util/dlinklist.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/util_process.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
+#include "common/common.h"
+#include "common/logging.h"
 
 #define TIMELIMIT() timeval_current_ofs(10, 0)
 
@@ -53,35 +64,42 @@ struct ctdb_vacuum_handle {
 
 /*  a list of records to possibly delete */
 struct vacuum_data {
-       uint32_t vacuum_limit;
-       uint32_t repack_limit;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct tdb_context *dest_db;
        trbt_tree_t *delete_list;
-       uint32_t delete_count;
        struct ctdb_marshall_buffer **vacuum_fetch_list;
        struct timeval start;
        bool traverse_error;
        bool vacuum;
-       uint32_t total;
-       uint32_t vacuumed;
-       uint32_t copied;
-       uint32_t fast_added_to_vacuum_fetch_list;
-       uint32_t fast_added_to_delete_list;
-       uint32_t fast_deleted;
-       uint32_t fast_skipped;
-       uint32_t fast_error;
-       uint32_t fast_total;
-       uint32_t full_scheduled;
-       uint32_t full_skipped;
-       uint32_t full_error;
-       uint32_t full_total;
-       uint32_t delete_left;
-       uint32_t delete_remote_error;
-       uint32_t delete_local_error;
-       uint32_t delete_deleted;
-       uint32_t delete_skipped;
+       struct {
+               struct {
+                       uint32_t added_to_vacuum_fetch_list;
+                       uint32_t added_to_delete_list;
+                       uint32_t deleted;
+                       uint32_t skipped;
+                       uint32_t error;
+                       uint32_t total;
+               } delete_queue;
+               struct {
+                       uint32_t scheduled;
+                       uint32_t skipped;
+                       uint32_t error;
+                       uint32_t total;
+               } db_traverse;
+               struct {
+                       uint32_t total;
+                       uint32_t remote_error;
+                       uint32_t local_error;
+                       uint32_t deleted;
+                       uint32_t skipped;
+                       uint32_t left;
+               } delete_list;
+               struct {
+                       uint32_t vacuumed;
+                       uint32_t copied;
+               } repack;
+       } count;
 };
 
 /* this structure contains the information for one record to be deleted */
@@ -89,6 +107,7 @@ struct delete_record_data {
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct ctdb_ltdb_header hdr;
+       uint32_t remote_fail_count;
        TDB_DATA key;
        uint8_t keydata[1];
 };
@@ -131,6 +150,7 @@ static int insert_delete_record_data_into_tree(struct ctdb_context *ctdb,
        memcpy(dd->keydata, key.dptr, key.dsize);
 
        dd->hdr = *hdr;
+       dd->remote_fail_count = 0;
 
        hash = ctdb_hash(&key);
 
@@ -161,7 +181,7 @@ static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
                return -1;
        }
 
-       vdata->delete_count++;
+       vdata->count.delete_list.total++;
 
        return 0;
 }
@@ -174,42 +194,29 @@ static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
                                           TDB_DATA key)
 {
        struct ctdb_context *ctdb = vdata->ctdb;
-       struct ctdb_rec_data *rec;
        uint32_t lmaster;
-       size_t old_size;
        struct ctdb_marshall_buffer *vfl;
 
        lmaster = ctdb_lmaster(ctdb, &key);
 
        vfl = vdata->vacuum_fetch_list[lmaster];
 
-       rec = ctdb_marshall_record(vfl, ctdb->pnn, key, NULL, tdb_null);
-       if (rec == NULL) {
+       vfl = ctdb_marshall_add(ctdb, vfl, vfl->db_id, ctdb->pnn,
+                               key, NULL, tdb_null);
+       if (vfl == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                vdata->traverse_error = true;
                return -1;
        }
 
-       old_size = talloc_get_size(vfl);
-       vfl = talloc_realloc_size(NULL, vfl, old_size + rec->length);
-       if (vfl == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               vdata->traverse_error = true;
-               return -1;
-       }
        vdata->vacuum_fetch_list[lmaster] = vfl;
 
-       vfl->count++;
-       memcpy(old_size+(uint8_t *)vfl, rec, rec->length);
-       talloc_free(rec);
-
-       vdata->total++;
-
        return 0;
 }
 
 
-static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
+static void ctdb_vacuum_event(struct tevent_context *ev,
+                             struct tevent_timer *te,
                              struct timeval t, void *private_data);
 
 static int vacuum_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
@@ -240,11 +247,11 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        struct ctdb_ltdb_header *hdr;
        int res = 0;
 
-       vdata->full_total++;
+       vdata->count.db_traverse.total++;
 
        lmaster = ctdb_lmaster(ctdb, &key);
        if (lmaster >= ctdb->num_nodes) {
-               vdata->full_error++;
+               vdata->count.db_traverse.error++;
                DEBUG(DEBUG_CRIT, (__location__
                                   " lmaster[%u] >= ctdb->num_nodes[%u] for key"
                                   " with hash[%u]!\n",
@@ -256,14 +263,14 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
                /* it is not a deleted record */
-               vdata->full_skipped++;
+               vdata->count.db_traverse.skipped++;
                return 0;
        }
 
        hdr = (struct ctdb_ltdb_header *)data.dptr;
 
        if (hdr->dmaster != ctdb->pnn) {
-               vdata->full_skipped++;
+               vdata->count.db_traverse.skipped++;
                return 0;
        }
 
@@ -273,9 +280,9 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
         */
        res = insert_record_into_delete_queue(ctdb_db, hdr, key);
        if (res != 0) {
-               vdata->full_error++;
+               vdata->count.db_traverse.error++;
        } else {
-               vdata->full_scheduled++;
+               vdata->count.db_traverse.scheduled++;
        }
 
        return 0;
@@ -289,161 +296,20 @@ static int delete_marshall_traverse(void *param, void *data)
 {
        struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
        struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
-       struct ctdb_rec_data *rec;
-       size_t old_size;
+       struct ctdb_marshall_buffer *m;
 
-       rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
-       if (rec == NULL) {
+       m = ctdb_marshall_add(recs, recs->records, recs->records->db_id,
+                             recs->records->db_id,
+                             dd->key, &dd->hdr, tdb_null);
+       if (m == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
-               return 0;
+               return -1;
        }
 
-       old_size = talloc_get_size(recs->records);
-       recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
-       if (recs->records == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               return 0;
-       }
-       recs->records->count++;
-       memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
+       recs->records = m;
        return 0;
 }
 
-/**
- * Variant of delete_marshall_traverse() that bumps the
- * RSN of each traversed record in the database.
- *
- * This is needed to ensure that when rolling out our
- * empty record copy before remote deletion, we as the
- * record's dmaster keep a higher RSN than the non-dmaster
- * nodes. This is needed to prevent old copies from
- * resurrection in recoveries.
- */
-static int delete_marshall_traverse_first(void *param, void *data)
-{
-       struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
-       struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
-       struct ctdb_db_context *ctdb_db = dd->ctdb_db;
-       struct ctdb_context *ctdb = ctdb_db->ctdb;
-       struct ctdb_ltdb_header *header;
-       TDB_DATA tdb_data, ctdb_data;
-       uint32_t lmaster;
-       uint32_t hash = ctdb_hash(&(dd->key));
-       int res;
-
-       res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
-       if (res != 0) {
-               DEBUG(DEBUG_ERR,
-                     (__location__ " Error getting chainlock on record with "
-                      "key hash [0x%08x] on database db[%s].\n",
-                      hash, ctdb_db->db_name));
-               recs->vdata->delete_skipped++;
-               talloc_free(dd);
-               return 0;
-       }
-
-       /*
-        * Verify that the record is still empty, its RSN has not
-        * changed and that we are still its lmaster and dmaster.
-        */
-
-       tdb_data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
-       if (tdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] does not exist or is not"
-                                  " a ctdb-record.  skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       if (tdb_data.dsize > sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] has been recycled. "
-                                  "skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       header = (struct ctdb_ltdb_header *)tdb_data.dptr;
-
-       if (header->flags & CTDB_REC_RO_FLAGS) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] has read-only flags. "
-                                  "skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       if (header->dmaster != ctdb->pnn) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] has been migrated away. "
-                                  "skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       if (header->rsn != dd->hdr.rsn) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] seems to have been "
-                                  "migrated away and back again (with empty "
-                                  "data). skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
-
-       if (lmaster != ctdb->pnn) {
-               DEBUG(DEBUG_INFO, (__location__ ": not lmaster for record in "
-                                  "delete list (key hash [0x%08x], db[%s]). "
-                                  "Strange! skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       /*
-        * Increment the record's RSN to ensure the dmaster (i.e. the current
-        * node) has the highest RSN of the record in the cluster.
-        * This is to prevent old record copies from resurrecting in recoveries
-        * if something should fail during the deletion process.
-        * Note that ctdb_ltdb_store_server() increments the RSN if called
-        * on the record's dmaster.
-        */
-
-       ctdb_data.dptr = tdb_data.dptr + sizeof(struct ctdb_ltdb_header);
-       ctdb_data.dsize = tdb_data.dsize - sizeof(struct ctdb_ltdb_header);
-
-       res = ctdb_ltdb_store(ctdb_db, dd->key, header, ctdb_data);
-       if (res != 0) {
-               DEBUG(DEBUG_ERR, (__location__ ": Failed to store record with "
-                                 "key hash [0x%08x] on database db[%s].\n",
-                                 hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
-
-       goto done;
-
-skip:
-       tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
-
-       recs->vdata->delete_skipped++;
-       talloc_free(dd);
-       dd = NULL;
-
-done:
-       if (tdb_data.dptr != NULL) {
-               free(tdb_data.dptr);
-       }
-
-       if (dd == NULL) {
-               return 0;
-       }
-
-       return delete_marshall_traverse(param, data);
-}
-
 /**
  * traverse function for the traversal of the delete_queue,
  * the fast-path vacuuming list.
@@ -476,15 +342,11 @@ static int delete_queue_traverse(void *param, void *data)
        uint32_t lmaster;
        uint32_t hash = ctdb_hash(&(dd->key));
 
-       vdata->fast_total++;
+       vdata->count.delete_queue.total++;
 
-       res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
+       res = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key);
        if (res != 0) {
-               DEBUG(DEBUG_ERR,
-                     (__location__ " Error getting chainlock on record with "
-                      "key hash [0x%08x] on database db[%s].\n",
-                      hash, ctdb_db->db_name));
-               vdata->fast_error++;
+               vdata->count.delete_queue.error++;
                return 0;
        }
 
@@ -525,9 +387,9 @@ static int delete_queue_traverse(void *param, void *data)
                        DEBUG(DEBUG_ERR,
                              (__location__ " Error adding record to list "
                               "of records to send to lmaster.\n"));
-                       vdata->fast_error++;
+                       vdata->count.delete_queue.error++;
                } else {
-                       vdata->fast_added_to_vacuum_fetch_list++;
+                       vdata->count.delete_queue.added_to_vacuum_fetch_list++;
                }
                goto done;
        }
@@ -540,9 +402,9 @@ static int delete_queue_traverse(void *param, void *data)
                        DEBUG(DEBUG_ERR,
                              (__location__ " Error adding record to list "
                               "of records for deletion on lmaster.\n"));
-                       vdata->fast_error++;
+                       vdata->count.delete_queue.error++;
                } else {
-                       vdata->fast_added_to_delete_list++;
+                       vdata->count.delete_queue.added_to_delete_list++;
                }
        } else {
                res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
@@ -552,7 +414,7 @@ static int delete_queue_traverse(void *param, void *data)
                              (__location__ " Error deleting record with key "
                               "hash [0x%08x] from local data base db[%s].\n",
                               hash, ctdb_db->db_name));
-                       vdata->fast_error++;
+                       vdata->count.delete_queue.error++;
                        goto done;
                }
 
@@ -560,13 +422,13 @@ static int delete_queue_traverse(void *param, void *data)
                      (__location__ " Deleted record with key hash "
                       "[0x%08x] from local data base db[%s].\n",
                       hash, ctdb_db->db_name));
-               vdata->fast_deleted++;
+               vdata->count.delete_queue.deleted++;
        }
 
        goto done;
 
 skipped:
-       vdata->fast_skipped++;
+       vdata->count.delete_queue.skipped++;
 
 done:
        tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
@@ -587,19 +449,25 @@ static int delete_record_traverse(void *param, void *data)
        struct ctdb_db_context *ctdb_db = dd->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int res;
-       struct ctdb_ltdb_header *header;
-       TDB_DATA tdb_data;
+       struct ctdb_ltdb_header header;
        uint32_t lmaster;
        uint32_t hash = ctdb_hash(&(dd->key));
 
+       if (dd->remote_fail_count > 0) {
+               vdata->count.delete_list.remote_error++;
+               vdata->count.delete_list.left--;
+               talloc_free(dd);
+               return 0;
+       }
+
        res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
        if (res != 0) {
                DEBUG(DEBUG_ERR,
                      (__location__ " Error getting chainlock on record with "
                       "key hash [0x%08x] on database db[%s].\n",
                       hash, ctdb_db->db_name));
-               vdata->delete_local_error++;
-               vdata->delete_left--;
+               vdata->count.delete_list.local_error++;
+               vdata->count.delete_list.left--;
                talloc_free(dd);
                return 0;
        }
@@ -609,26 +477,13 @@ static int delete_record_traverse(void *param, void *data)
         * changed and that we are still its lmaster and dmaster.
         */
 
-       tdb_data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
-       if (tdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] does not exist or is not"
-                                  " a ctdb-record.  skipping.\n",
-                                  hash, ctdb_db->db_name));
-               goto skip;
-       }
-
-       if (tdb_data.dsize > sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
-                                  "on database db[%s] has been recycled. "
-                                  "skipping.\n",
-                                  hash, ctdb_db->db_name));
+       res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
+                              vacuum_record_parser, &header);
+       if (res != 0) {
                goto skip;
        }
 
-       header = (struct ctdb_ltdb_header *)tdb_data.dptr;
-
-       if (header->flags & CTDB_REC_RO_FLAGS) {
+       if (header.flags & CTDB_REC_RO_FLAGS) {
                DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
                                   "on database db[%s] has read-only flags. "
                                   "skipping.\n",
@@ -636,7 +491,7 @@ static int delete_record_traverse(void *param, void *data)
                goto skip;
        }
 
-       if (header->dmaster != ctdb->pnn) {
+       if (header.dmaster != ctdb->pnn) {
                DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
                                   "on database db[%s] has been migrated away. "
                                   "skipping.\n",
@@ -644,12 +499,10 @@ static int delete_record_traverse(void *param, void *data)
                goto skip;
        }
 
-       if (header->rsn != dd->hdr.rsn + 1) {
+       if (header.rsn != dd->hdr.rsn) {
                /*
                 * The record has been migrated off the node and back again.
                 * But not requeued for deletion. Skip it.
-                * (Note that the first marshall traverse has bumped the RSN
-                *  on disk.)
                 */
                DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
                                   "on database db[%s] seems to have been "
@@ -676,7 +529,7 @@ static int delete_record_traverse(void *param, void *data)
                      (__location__ " Error deleting record with key hash "
                       "[0x%08x] from local data base db[%s].\n",
                       hash, ctdb_db->db_name));
-               vdata->delete_local_error++;
+               vdata->count.delete_list.local_error++;
                goto done;
        }
 
@@ -684,49 +537,56 @@ static int delete_record_traverse(void *param, void *data)
              (__location__ " Deleted record with key hash [0x%08x] from "
               "local data base db[%s].\n", hash, ctdb_db->db_name));
 
-       vdata->delete_deleted++;
+       vdata->count.delete_list.deleted++;
        goto done;
 
 skip:
-       vdata->delete_skipped++;
+       vdata->count.delete_list.skipped++;
 
 done:
-       free(tdb_data.dptr);
-
        tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
 
        talloc_free(dd);
-       vdata->delete_left--;
+       vdata->count.delete_list.left--;
 
        return 0;
 }
 
 /**
- * Fast vacuuming run:
  * Traverse the delete_queue.
- * This fills the same lists as the database traverse.
+ * Records are either deleted directly or filled
+ * into the delete list or the vacuum fetch lists
+ * for further processing.
  */
-static void ctdb_vacuum_db_fast(struct ctdb_db_context *ctdb_db,
-                               struct vacuum_data *vdata)
+static void ctdb_process_delete_queue(struct ctdb_db_context *ctdb_db,
+                                     struct vacuum_data *vdata)
 {
        uint32_t sum;
+       int ret;
 
-       trbt_traversearray32(ctdb_db->delete_queue, 1, delete_queue_traverse, vdata);
+       ret = trbt_traversearray32(ctdb_db->delete_queue, 1,
+                                  delete_queue_traverse, vdata);
 
-       sum = vdata->fast_deleted
-           + vdata->fast_skipped
-           + vdata->fast_error
-           + vdata->fast_added_to_delete_list
-           + vdata->fast_added_to_vacuum_fetch_list;
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing "
+                     "the delete queue.\n"));
+       }
 
-       if (vdata->fast_total != sum) {
+       sum = vdata->count.delete_queue.deleted
+           + vdata->count.delete_queue.skipped
+           + vdata->count.delete_queue.error
+           + vdata->count.delete_queue.added_to_delete_list
+           + vdata->count.delete_queue.added_to_vacuum_fetch_list;
+
+       if (vdata->count.delete_queue.total != sum) {
                DEBUG(DEBUG_ERR, (__location__ " Inconsistency in fast vacuum "
                      "counts for db[%s]: total[%u] != sum[%u]\n",
-                     ctdb_db->db_name, (unsigned)vdata->fast_total,
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_queue.total,
                      (unsigned)sum));
        }
 
-       if (vdata->fast_total > 0) {
+       if (vdata->count.delete_queue.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " fast vacuuming delete_queue traverse statistics: "
@@ -738,12 +598,12 @@ static void ctdb_vacuum_db_fast(struct ctdb_db_context *ctdb_db,
                       "adl[%u] "
                       "avf[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->fast_total,
-                      (unsigned)vdata->fast_deleted,
-                      (unsigned)vdata->fast_skipped,
-                      (unsigned)vdata->fast_error,
-                      (unsigned)vdata->fast_added_to_delete_list,
-                      (unsigned)vdata->fast_added_to_vacuum_fetch_list));
+                      (unsigned)vdata->count.delete_queue.total,
+                      (unsigned)vdata->count.delete_queue.deleted,
+                      (unsigned)vdata->count.delete_queue.skipped,
+                      (unsigned)vdata->count.delete_queue.error,
+                      (unsigned)vdata->count.delete_queue.added_to_delete_list,
+                      (unsigned)vdata->count.delete_queue.added_to_vacuum_fetch_list));
        }
 
        return;
@@ -756,8 +616,8 @@ static void ctdb_vacuum_db_fast(struct ctdb_db_context *ctdb_db,
  * This is not done each time but only every tunable
  * VacuumFastPathCount times.
  */
-static int ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
-                                  struct vacuum_data *vdata)
+static void ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
+                                   struct vacuum_data *vdata)
 {
        int ret;
 
@@ -765,10 +625,10 @@ static int ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
        if (ret == -1 || vdata->traverse_error) {
                DEBUG(DEBUG_ERR, (__location__ " Traverse error in vacuuming "
                                  "'%s'\n", ctdb_db->db_name));
-               return -1;
+               return;
        }
 
-       if (vdata->full_total > 0) {
+       if (vdata->count.db_traverse.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " full vacuuming db traverse statistics: "
@@ -778,13 +638,13 @@ static int ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
                       "err[%u] "
                       "sched[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->full_total,
-                      (unsigned)vdata->full_skipped,
-                      (unsigned)vdata->full_error,
-                      (unsigned)vdata->full_scheduled));
+                      (unsigned)vdata->count.db_traverse.total,
+                      (unsigned)vdata->count.db_traverse.skipped,
+                      (unsigned)vdata->count.db_traverse.error,
+                      (unsigned)vdata->count.db_traverse.scheduled));
        }
 
-       return 0;
+       return;
 }
 
 /**
@@ -792,8 +652,8 @@ static int ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
  * For records for which we are not the lmaster, tell the lmaster to
  * fetch the record.
  */
-static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
-                                          struct vacuum_data *vdata)
+static void ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
+                                           struct vacuum_data *vdata)
 {
        int i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
@@ -814,8 +674,7 @@ static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
                                   vfl->count, ctdb->nodes[i]->pnn,
                                   ctdb_db->db_name));
 
-               data.dsize = talloc_get_size(vfl);
-               data.dptr  = (void *)vfl;
+               data = ctdb_marshall_finish(vfl);
                if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn,
                                             CTDB_SRVID_VACUUM_FETCH,
                                             data) != 0)
@@ -823,11 +682,10 @@ static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
                        DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
                                          "fetch message to %u\n",
                                          ctdb->nodes[i]->pnn));
-                       return -1;
                }
        }
 
-       return 0;
+       return;
 }
 
 /**
@@ -842,40 +700,35 @@ static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
  * at least some of these records previously from the former dmasters
  * with the vacuum fetch message.
  *
- * This last step is implemented as a 3-phase process to protect from
- * races leading to data corruption:
- *
- *  1) Send the lmaster's copy to all other active nodes with the
- *     RECEIVE_RECORDS control: The remote nodes store the lmaster's copy.
- *  2) Send the records that could successfully be stored remotely
- *     in step #1 to all active nodes with the TRY_DELETE_RECORDS
+ *  1) Send the records to all active nodes with the TRY_DELETE_RECORDS
  *     control. The remote notes delete their local copy.
- *  3) The lmaster locally deletes its copies of all records that
+ *  2) The lmaster locally deletes its copies of all records that
  *     could successfully be deleted remotely in step #2.
  */
-static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
-                                   struct vacuum_data *vdata)
+static void ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
+                                    struct vacuum_data *vdata)
 {
        int ret, i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct delete_records_list *recs;
        TDB_DATA indata;
-       struct ctdb_node_map *nodemap;
+       struct ctdb_node_map_old *nodemap;
        uint32_t *active_nodes;
        int num_active_nodes;
        TALLOC_CTX *tmp_ctx;
+       uint32_t sum;
 
-       if (vdata->delete_count == 0) {
-               return 0;
+       if (vdata->count.delete_list.total == 0) {
+               return;
        }
 
        tmp_ctx = talloc_new(vdata);
        if (tmp_ctx == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return 0;
+               return;
        }
 
-       vdata->delete_left = vdata->delete_count;
+       vdata->count.delete_list.left = vdata->count.delete_list.total;
 
        /*
         * get the list of currently active nodes
@@ -887,7 +740,6 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
                                   &nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
-               ret = -1;
                goto done;
        }
 
@@ -898,151 +750,48 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
        num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
 
        /*
-        * Now delete the records all active nodes in a three-phase process:
-        * 1) send all active remote nodes the current empty copy with this
-        *    node as DMASTER
-        * 2) if all nodes could store the new copy,
-        *    tell all the active remote nodes to delete all their copy
-        * 3) if all remote nodes deleted their record copy, delete it locally
-        */
-
-       /*
-        * Step 1:
-        * Send currently empty record copy to all active nodes for storing.
+        * Now delete the records all active nodes in a two-phase process:
+        * 1) tell all active remote nodes to delete all their copy
+        * 2) if all remote nodes deleted their record copy, delete it locally
         */
 
        recs = talloc_zero(tmp_ctx, struct delete_records_list);
        if (recs == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               ret = -1;
-               goto done;
-       }
-       recs->records = (struct ctdb_marshall_buffer *)
-               talloc_zero_size(recs,
-                                offsetof(struct ctdb_marshall_buffer, data));
-       if (recs->records == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               ret = -1;
                goto done;
        }
-       recs->records->db_id = ctdb_db->db_id;
-       recs->vdata = vdata;
 
        /*
-        * traverse the tree of all records we want to delete and
-        * create a blob we can send to the other nodes.
-        *
-        * We call delete_marshall_traverse_first() to bump the
-        * records' RSNs in the database, to ensure we (as dmaster)
-        * keep the highest RSN of the records in the cluster.
-        */
-       trbt_traversearray32(vdata->delete_list, 1,
-                            delete_marshall_traverse_first, recs);
-
-       indata.dsize = talloc_get_size(recs->records);
-       indata.dptr  = (void *)recs->records;
-
-       for (i = 0; i < num_active_nodes; i++) {
-               struct ctdb_marshall_buffer *records;
-               struct ctdb_rec_data *rec;
-               int32_t res;
-               TDB_DATA outdata;
-
-               ret = ctdb_control(ctdb, active_nodes[i], 0,
-                               CTDB_CONTROL_RECEIVE_RECORDS, 0,
-                               indata, recs, &outdata, &res,
-                               NULL, NULL);
-               if (ret != 0 || res != 0) {
-                       DEBUG(DEBUG_ERR, ("Error storing record copies on "
-                                         "node %u: ret[%d] res[%d]\n",
-                                         active_nodes[i], ret, res));
-                       ret = -1;
-                       goto done;
-               }
-
-               /*
-                * outdata contains the list of records coming back
-                * from the node: These are the records that the
-                * remote node could not store. We remove these from
-                * the list to process further.
-                */
-               records = (struct ctdb_marshall_buffer *)outdata.dptr;
-               rec = (struct ctdb_rec_data *)&records->data[0];
-               while (records->count-- > 1) {
-                       TDB_DATA reckey, recdata;
-                       struct ctdb_ltdb_header *rechdr;
-                       struct delete_record_data *dd;
-
-                       reckey.dptr = &rec->data[0];
-                       reckey.dsize = rec->keylen;
-                       recdata.dptr = &rec->data[reckey.dsize];
-                       recdata.dsize = rec->datalen;
-
-                       if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
-                               DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                               ret = -1;
-                               goto done;
-                       }
-                       rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
-                       recdata.dptr += sizeof(*rechdr);
-                       recdata.dsize -= sizeof(*rechdr);
-
-                       dd = (struct delete_record_data *)trbt_lookup32(
-                                       vdata->delete_list,
-                                       ctdb_hash(&reckey));
-                       if (dd != NULL) {
-                               /*
-                                * The other node could not store the record
-                                * copy and it is the first node that failed.
-                                * So we should remove it from the tree and
-                                * update statistics.
-                                */
-                               talloc_free(dd);
-                               vdata->delete_remote_error++;
-                               vdata->delete_left--;
-                       }
-
-                       rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
-               }
-       }
-
-       if (vdata->delete_left == 0) {
-               goto success;
-       }
-
-       /*
-        * Step 2:
-        * Send the remaining records to all active nodes for deletion.
-        *
-        * The lmaster's (i.e. our) copies of these records have been stored
-        * successfully on the other nodes.
+        * Step 1:
+        * Send all records to all active nodes for deletion.
         */
 
        /*
         * Create a marshall blob from the remaining list of records to delete.
         */
 
-       talloc_free(recs->records);
-
        recs->records = (struct ctdb_marshall_buffer *)
                talloc_zero_size(recs,
                                 offsetof(struct ctdb_marshall_buffer, data));
        if (recs->records == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               ret = -1;
                goto done;
        }
        recs->records->db_id = ctdb_db->db_id;
 
-       trbt_traversearray32(vdata->delete_list, 1,
-                            delete_marshall_traverse, recs);
+       ret = trbt_traversearray32(vdata->delete_list, 1,
+                                  delete_marshall_traverse, recs);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
+                     "delete list for second marshalling.\n"));
+               goto done;
+       }
 
-       indata.dsize = talloc_get_size(recs->records);
-       indata.dptr  = (void *)recs->records;
+       indata = ctdb_marshall_finish(recs->records);
 
        for (i = 0; i < num_active_nodes; i++) {
                struct ctdb_marshall_buffer *records;
-               struct ctdb_rec_data *rec;
+               struct ctdb_rec_data_old *rec;
                int32_t res;
                TDB_DATA outdata;
 
@@ -1054,7 +803,6 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
                        DEBUG(DEBUG_ERR, ("Failed to delete records on "
                                          "node %u: ret[%d] res[%d]\n",
                                          active_nodes[i], ret, res));
-                       ret = -1;
                        goto done;
                }
 
@@ -1065,7 +813,7 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
                 * the list to delete locally.
                 */
                records = (struct ctdb_marshall_buffer *)outdata.dptr;
-               rec = (struct ctdb_rec_data *)&records->data[0];
+               rec = (struct ctdb_rec_data_old *)&records->data[0];
                while (records->count-- > 1) {
                        TDB_DATA reckey, recdata;
                        struct ctdb_ltdb_header *rechdr;
@@ -1078,7 +826,6 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
 
                        if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
                                DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                               ret = -1;
                                goto done;
                        }
                        rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
@@ -1090,38 +837,61 @@ static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
                                        ctdb_hash(&reckey));
                        if (dd != NULL) {
                                /*
-                                * The other node could not delete the
-                                * record and it is the first node that
-                                * failed. So we should remove it from
-                                * the tree and update statistics.
+                                * The remote node could not delete the
+                                * record.  Since other remote nodes can
+                                * also fail, we just mark the record.
                                 */
-                               talloc_free(dd);
-                               vdata->delete_remote_error++;
-                               vdata->delete_left--;
+                               dd->remote_fail_count++;
+                       } else {
+                               DEBUG(DEBUG_ERR, (__location__ " Failed to "
+                                     "find record with hash 0x%08x coming "
+                                     "back from TRY_DELETE_RECORDS "
+                                     "control in delete list.\n",
+                                     ctdb_hash(&reckey)));
                        }
 
-                       rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+                       rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
                }
        }
 
-       if (vdata->delete_left == 0) {
-               goto success;
-       }
-
        /*
-        * Step 3:
+        * Step 2:
         * Delete the remaining records locally.
         *
         * These records have successfully been deleted on all
         * active remote nodes.
         */
 
-       trbt_traversearray32(vdata->delete_list, 1,
-                            delete_record_traverse, vdata);
+       ret = trbt_traversearray32(vdata->delete_list, 1,
+                                  delete_record_traverse, vdata);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
+                     "delete list for deletion.\n"));
+       }
+
+       if (vdata->count.delete_list.left != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Vaccum db[%s] error: "
+                     "there are %u records left for deletion after "
+                     "processing delete list\n",
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_list.left));
+       }
+
+       sum = vdata->count.delete_list.deleted
+           + vdata->count.delete_list.skipped
+           + vdata->count.delete_list.remote_error
+           + vdata->count.delete_list.local_error
+           + vdata->count.delete_list.left;
 
-success:
+       if (vdata->count.delete_list.total != sum) {
+               DEBUG(DEBUG_ERR, (__location__ " Inconsistency in vacuum "
+                     "delete list counts for db[%s]: total[%u] != sum[%u]\n",
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_list.total,
+                     (unsigned)sum));
+       }
 
-       if (vdata->delete_count > 0) {
+       if (vdata->count.delete_list.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " vacuum delete list statistics: "
@@ -1133,47 +903,63 @@ success:
                       "loc.err[%u] "
                       "left[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->delete_count,
-                      (unsigned)vdata->delete_deleted,
-                      (unsigned)vdata->delete_skipped,
-                      (unsigned)vdata->delete_remote_error,
-                      (unsigned)vdata->delete_local_error,
-                      (unsigned)vdata->delete_left));
+                      (unsigned)vdata->count.delete_list.total,
+                      (unsigned)vdata->count.delete_list.deleted,
+                      (unsigned)vdata->count.delete_list.skipped,
+                      (unsigned)vdata->count.delete_list.remote_error,
+                      (unsigned)vdata->count.delete_list.local_error,
+                      (unsigned)vdata->count.delete_list.left));
        }
 
-       ret = 0;
-
 done:
        talloc_free(tmp_ctx);
 
-       return ret;
+       return;
 }
 
 /**
  * initialize the vacuum_data
  */
-static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
-                                       struct vacuum_data *vdata)
+static struct vacuum_data *ctdb_vacuum_init_vacuum_data(
+                                       struct ctdb_db_context *ctdb_db,
+                                       TALLOC_CTX *mem_ctx)
 {
        int i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct vacuum_data *vdata;
+
+       vdata = talloc_zero(mem_ctx, struct vacuum_data);
+       if (vdata == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               return NULL;
+       }
+
+       vdata->ctdb = ctdb_db->ctdb;
+       vdata->ctdb_db = ctdb_db;
+       vdata->delete_list = trbt_create(vdata, 0);
+       if (vdata->delete_list == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               goto fail;
+       }
+
+       vdata->start = timeval_current();
 
-       vdata->fast_added_to_delete_list = 0;
-       vdata->fast_added_to_vacuum_fetch_list = 0;
-       vdata->fast_deleted = 0;
-       vdata->fast_skipped = 0;
-       vdata->fast_error = 0;
-       vdata->fast_total = 0;
-       vdata->full_scheduled = 0;
-       vdata->full_skipped = 0;
-       vdata->full_error = 0;
-       vdata->full_total = 0;
-       vdata->delete_count = 0;
-       vdata->delete_left = 0;
-       vdata->delete_remote_error = 0;
-       vdata->delete_local_error = 0;
-       vdata->delete_skipped = 0;
-       vdata->delete_deleted = 0;
+       vdata->count.delete_queue.added_to_delete_list = 0;
+       vdata->count.delete_queue.added_to_vacuum_fetch_list = 0;
+       vdata->count.delete_queue.deleted = 0;
+       vdata->count.delete_queue.skipped = 0;
+       vdata->count.delete_queue.error = 0;
+       vdata->count.delete_queue.total = 0;
+       vdata->count.db_traverse.scheduled = 0;
+       vdata->count.db_traverse.skipped = 0;
+       vdata->count.db_traverse.error = 0;
+       vdata->count.db_traverse.total = 0;
+       vdata->count.delete_list.total = 0;
+       vdata->count.delete_list.left = 0;
+       vdata->count.delete_list.remote_error = 0;
+       vdata->count.delete_list.local_error = 0;
+       vdata->count.delete_list.skipped = 0;
+       vdata->count.delete_list.deleted = 0;
 
        /* the list needs to be of length num_nodes */
        vdata->vacuum_fetch_list = talloc_zero_array(vdata,
@@ -1181,7 +967,7 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
                                                ctdb->num_nodes);
        if (vdata->vacuum_fetch_list == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return -1;
+               goto fail;
        }
        for (i = 0; i < ctdb->num_nodes; i++) {
                vdata->vacuum_fetch_list[i] = (struct ctdb_marshall_buffer *)
@@ -1189,12 +975,17 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
                                         offsetof(struct ctdb_marshall_buffer, data));
                if (vdata->vacuum_fetch_list[i] == NULL) {
                        DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-                       return -1;
+                       talloc_free(vdata);
+                       return NULL;
                }
                vdata->vacuum_fetch_list[i]->db_id = ctdb_db->db_id;
        }
 
-       return 0;
+       return vdata;
+
+fail:
+       talloc_free(vdata);
+       return NULL;
 }
 
 /**
@@ -1230,11 +1021,12 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
  * This executes in the child context.
  */
 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
-                         struct vacuum_data *vdata,
                          bool full_vacuum_run)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int ret, pnn;
+       struct vacuum_data *vdata;
+       TALLOC_CTX *tmp_ctx;
 
        DEBUG(DEBUG_INFO, (__location__ " Entering %s vacuum run for db "
                           "%s db_id[0x%08x]\n",
@@ -1255,159 +1047,32 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
 
        ctdb->pnn = pnn;
 
-       ret = ctdb_vacuum_init_vacuum_data(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       if (full_vacuum_run) {
-               ret = ctdb_vacuum_traverse_db(ctdb_db, vdata);
-               if (ret != 0) {
-                       return ret;
-               }
-       }
-
-       ctdb_vacuum_db_fast(ctdb_db, vdata);
-
-       ret = ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       ret = ctdb_process_delete_list(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       /* this ensures we run our event queue */
-       ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-
-       return 0;
-}
-
-
-/*
- * traverse function for repacking
- */
-static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
-                          void *private_data)
-{
-       struct vacuum_data *vdata = (struct vacuum_data *)private_data;
-
-       if (vdata->vacuum) {
-               uint32_t hash = ctdb_hash(&key);
-               struct delete_record_data *kd;
-               /*
-                * check if we can ignore this record because it's in the delete_list
-                */
-               kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_list, hash);
-               /*
-                * there might be hash collisions so we have to compare the keys here to be sure
-                */
-               if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
-                       struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
-                       /*
-                        * we have to check if the record hasn't changed in the meantime in order to
-                        * savely remove it from the database
-                        */
-                       if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
-                               hdr->dmaster == kd->ctdb->pnn &&
-                               ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
-                               kd->hdr.rsn == hdr->rsn) {
-                               vdata->vacuumed++;
-                               return 0;
-                       }
-               }
-       }
-       if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
-               vdata->traverse_error = true;
-               return -1;
-       }
-       vdata->copied++;
-       return 0;
-}
-
-/*
- * repack a tdb
- */
-static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
-{
-       struct tdb_context *tmp_db;
-
-       if (tdb_transaction_start(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
-               return -1;
-       }
-
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
-                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
-                         O_RDWR|O_CREAT, 0);
-       if (tmp_db == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
-               tdb_transaction_cancel(tdb);
+       tmp_ctx = talloc_new(ctdb_db);
+       if (tmp_ctx == NULL) {
+               DEBUG(DEBUG_ERR, ("Out of memory!\n"));
                return -1;
        }
 
-       vdata->traverse_error = false;
-       vdata->dest_db = tmp_db;
-       vdata->vacuum = true;
-       vdata->vacuumed = 0;
-       vdata->copied = 0;
-
-       /*
-        * repack and vacuum on-the-fly by not writing the records that are
-        * no longer needed
-        */
-       if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;              
-       }
-
-       DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
-       
-       if (vdata->traverse_error) {
-               DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
+       vdata = ctdb_vacuum_init_vacuum_data(ctdb_db, tmp_ctx);
+       if (vdata == NULL) {
+               talloc_free(tmp_ctx);
                return -1;
        }
 
-       if (tdb_wipe_all(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;
+       if (full_vacuum_run) {
+               ctdb_vacuum_traverse_db(ctdb_db, vdata);
        }
 
-       vdata->traverse_error = false;
-       vdata->dest_db = tdb;
-       vdata->vacuum = false;
-       vdata->copied = 0;
-
-       if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;              
-       }
+       ctdb_process_delete_queue(ctdb_db, vdata);
 
-       if (vdata->traverse_error) {
-               DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;
-       }
+       ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
 
-       tdb_close(tmp_db);
+       ctdb_process_delete_list(ctdb_db, vdata);
 
+       talloc_free(tmp_ctx);
 
-       if (tdb_transaction_commit(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
-               return -1;
-       }
-       DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
+       /* this ensures we run our event queue */
+       ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
 
        return 0;
 }
@@ -1417,72 +1082,39 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
  * called from the child context
  */
 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
-                                    TALLOC_CTX *mem_ctx,
                                     bool full_vacuum_run)
 {
        uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
-       uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
        const char *name = ctdb_db->db_name;
        int freelist_size = 0;
-       struct vacuum_data *vdata;
-
-       vdata = talloc_zero(mem_ctx, struct vacuum_data);
-       if (vdata == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return -1;
-       }
-
-       vdata->ctdb = ctdb_db->ctdb;
-       vdata->vacuum_limit = vacuum_limit;
-       vdata->repack_limit = repack_limit;
-       vdata->delete_list = trbt_create(vdata, 0);
-       vdata->ctdb_db = ctdb_db;
-       if (vdata->delete_list == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               talloc_free(vdata);
-               return -1;
-       }
+       int ret;
 
-       vdata->start = timeval_current();
-       /*
-        * gather all records that can be deleted in vdata
-        */
-       if (ctdb_vacuum_db(ctdb_db, vdata, full_vacuum_run) != 0) {
+       if (ctdb_vacuum_db(ctdb_db, full_vacuum_run) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
        }
 
-       if (repack_limit != 0) {
-               freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
-               if (freelist_size == -1) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
-                       talloc_free(vdata);
-                       return -1;
-               }
+       freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
+       if (freelist_size == -1) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
+               return -1;
        }
 
        /*
         * decide if a repack is necessary
         */
-       if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit) &&
-           vdata->delete_left < vacuum_limit)
+       if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit))
        {
-               talloc_free(vdata);
                return 0;
        }
 
-       DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
-                       name, freelist_size, vdata->delete_left));
+       DEBUG(DEBUG_INFO, ("Repacking %s with %u freelist entries\n",
+                          name, freelist_size));
 
-       /*
-        * repack and implicitely get rid of the records we can delete
-        */
-       if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
+       ret = tdb_repack(ctdb_db->ltdb->tdb);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
-               talloc_free(vdata);
                return -1;
        }
-       talloc_free(vdata);
 
        return 0;
 }
@@ -1500,6 +1132,7 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
        struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
+       CTDB_UPDATE_DB_LATENCY(ctdb_db, "vacuum", vacuum.latency, l);
        DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
 
        if (child_ctx->child_pid != -1) {
@@ -1511,9 +1144,9 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
 
        DLIST_REMOVE(ctdb->vacuumers, child_ctx);
 
-       event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
-                       ctdb_vacuum_event, child_ctx->vacuum_handle);
+       tevent_add_timer(ctdb->ev, child_ctx->vacuum_handle,
+                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                        ctdb_vacuum_event, child_ctx->vacuum_handle);
 
        return 0;
 }
@@ -1521,8 +1154,9 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
 /*
  * this event is generated when a vacuum child process times out
  */
-static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
-                                        struct timeval t, void *private_data)
+static void vacuum_child_timeout(struct tevent_context *ev,
+                                struct tevent_timer *te,
+                                struct timeval t, void *private_data)
 {
        struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
 
@@ -1537,8 +1171,9 @@ static void vacuum_child_timeout(struct event_context *ev, struct timed_event *t
 /*
  * this event is generated when a vacuum child process has completed
  */
-static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
-                            uint16_t flags, void *private_data)
+static void vacuum_child_handler(struct tevent_context *ev,
+                                struct tevent_fd *fde,
+                                uint16_t flags, void *private_data)
 {
        struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
        char c = 0;
@@ -1547,7 +1182,7 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
        DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
        child_ctx->child_pid = -1;
 
-       ret = read(child_ctx->fd[0], &c, 1);
+       ret = sys_read(child_ctx->fd[0], &c, 1);
        if (ret != 1 || c != 0) {
                child_ctx->status = VACUUM_ERROR;
                DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
@@ -1561,28 +1196,38 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
 /*
  * this event is called every time we need to start a new vacuum process
  */
-static void
-ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
-                              struct timeval t, void *private_data)
+static void ctdb_vacuum_event(struct tevent_context *ev,
+                             struct tevent_timer *te,
+                             struct timeval t, void *private_data)
 {
        struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
        struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_vacuum_child_context *child_ctx;
        struct tevent_fd *fde;
+       bool full_vacuum_run = false;
        int ret;
 
-       /* we dont vacuum if we are in recovery mode, or db frozen */
+       /* we don't vacuum if we are in recovery mode, or db frozen */
        if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
-           ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
+           ctdb_db_frozen(ctdb_db)) {
                DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
-                                  ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
-                                  : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
-                                  ? "freeze pending"
-                                  : "frozen"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+                                  ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ?
+                                       "in recovery" : "frozen"));
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
+       /* Do not allow multiple vacuuming child processes to be active at the
+        * same time.  If there is vacuuming child process active, delay
+        * new vacuuming event to stagger vacuuming events.
+        */
+       if (ctdb->vacuumers != NULL) {
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(0, 500*1000),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
@@ -1597,13 +1242,17 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        if (ret != 0) {
                talloc_free(child_ctx);
                DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
-       if (vacuum_handle->fast_path_count > ctdb->tunable.vacuum_fast_path_count) {
+       if (vacuum_handle->fast_path_count >=
+           ctdb->tunable.vacuum_fast_path_count) {
+               if (ctdb->tunable.vacuum_fast_path_count > 0) {
+                       full_vacuum_run = true;
+               }
                vacuum_handle->fast_path_count = 0;
        }
 
@@ -1613,37 +1262,27 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                close(child_ctx->fd[1]);
                talloc_free(child_ctx);
                DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
 
        if (child_ctx->child_pid == 0) {
                char cc = 0;
-               bool full_vacuum_run = false;
                close(child_ctx->fd[0]);
 
                DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
-               ctdb_set_process_name("ctdb_vacuum");
-               if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
+               prctl_set_comment("ctdb_vacuum");
+               if (switch_from_server_to_client(ctdb) != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
                        _exit(1);
                }
 
-               /* 
-                * repack the db
-                */
-               if ((ctdb->tunable.vacuum_fast_path_count > 0) &&
-                   (vacuum_handle->fast_path_count == 0))
-               {
-                       full_vacuum_run = true;
-               }
-               cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx,
-                                              full_vacuum_run);
+               cc = ctdb_vacuum_and_repack_db(ctdb_db, full_vacuum_run);
 
-               write(child_ctx->fd[1], &cc, 1);
+               sys_write(child_ctx->fd[1], &cc, 1);
                _exit(0);
        }
 
@@ -1667,14 +1306,14 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                                 "in parent context. Shutting down\n");
        }
 
-       event_add_timed(ctdb->ev, child_ctx,
-               timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
-               vacuum_child_timeout, child_ctx);
+       tevent_add_timer(ctdb->ev, child_ctx,
+                        timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
+                        vacuum_child_timeout, child_ctx);
 
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
 
-       fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
-                          EVENT_FD_READ, vacuum_child_handler, child_ctx);
+       fde = tevent_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
+                           TEVENT_FD_READ, vacuum_child_handler, child_ctx);
        tevent_fd_set_auto_close(fde);
 
        vacuum_handle->child_ctx = child_ctx;
@@ -1698,8 +1337,10 @@ void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
  */
 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 {
-       if (ctdb_db->persistent != 0) {
-               DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
+       if (! ctdb_db_volatile(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Vacuuming is disabled for non-volatile database %s\n",
+                      ctdb_db->db_name));
                return 0;
        }
 
@@ -1709,9 +1350,9 @@ int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
        ctdb_db->vacuum_handle->ctdb_db         = ctdb_db;
        ctdb_db->vacuum_handle->fast_path_count = 0;
 
-       event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
-                       ctdb_vacuum_event, ctdb_db->vacuum_handle);
+       tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
+                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                        ctdb_vacuum_event, ctdb_db->vacuum_handle);
 
        return 0;
 }
@@ -1781,11 +1422,11 @@ static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
 
        hash = (uint32_t)ctdb_hash(&key);
 
-       DEBUG(DEBUG_INFO, (__location__ " schedule for deletion: db[%s] "
-                          "db_id[0x%08x] "
-                          "key_hash[0x%08x] "
-                          "lmaster[%u] "
-                          "migrated_with_data[%s]\n",
+       DEBUG(DEBUG_DEBUG, (__location__ " schedule for deletion: db[%s] "
+                           "db_id[0x%08x] "
+                           "key_hash[0x%08x] "
+                           "lmaster[%u] "
+                           "migrated_with_data[%s]\n",
                            ctdb_db->db_name, ctdb_db->db_id,
                            hash,
                            ctdb_lmaster(ctdb_db->ctdb, &key),
@@ -1868,7 +1509,7 @@ int32_t ctdb_local_schedule_for_deletion(struct ctdb_db_context *ctdb_db,
                return ret;
        }
 
-       /* if we dont have a connection to the daemon we can not send
+       /* if we don't have a connection to the daemon we can not send
           a control. For example sometimes from update_record control child
           process.
        */