ctdb-daemon: Do not force full vacuum on first vacuuming run
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_vacuum.c
index 0ca485dcc0ed724198f473f09893b90035ef8013..9d086917f3c47322cbd5e51c1771880a6cc63f50 100644 (file)
@@ -2,7 +2,7 @@
    ctdb vacuuming events
 
    Copyright (C) Ronnie Sahlberg  2009
-   Copyright (C) Michael Adam 2010-2011
+   Copyright (C) Michael Adam 2010-2013
    Copyright (C) Stefan Metzmacher 2010-2011
 
    This program is free software; you can redistribute it and/or modify
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "lib/tdb/include/tdb.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "system/dir.h"
-#include "../include/ctdb_private.h"
-#include "db_wrap.h"
+#include "system/time.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "lib/util/dlinklist.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/util_process.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
+#include "common/common.h"
+#include "common/logging.h"
 
 #define TIMELIMIT() timeval_current_ofs(10, 0)
 
@@ -53,36 +64,42 @@ struct ctdb_vacuum_handle {
 
 /*  a list of records to possibly delete */
 struct vacuum_data {
-       uint32_t vacuum_limit;
-       uint32_t repack_limit;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct tdb_context *dest_db;
        trbt_tree_t *delete_list;
-       uint32_t delete_count;
        struct ctdb_marshall_buffer **vacuum_fetch_list;
        struct timeval start;
        bool traverse_error;
        bool vacuum;
-       uint32_t total;
-       uint32_t vacuumed;
-       uint32_t copied;
-       uint32_t fast_added_to_vacuum_fetch_list;
-       uint32_t fast_added_to_delete_list;
-       uint32_t fast_deleted;
-       uint32_t fast_skipped;
-       uint32_t fast_error;
-       uint32_t fast_total;
-       uint32_t full_added_to_vacuum_fetch_list;
-       uint32_t full_added_to_delete_list;
-       uint32_t full_skipped;
-       uint32_t full_error;
-       uint32_t full_total;
-       uint32_t delete_left;
-       uint32_t delete_remote_error;
-       uint32_t delete_local_error;
-       uint32_t delete_deleted;
-       uint32_t delete_skipped;
+       struct {
+               struct {
+                       uint32_t added_to_vacuum_fetch_list;
+                       uint32_t added_to_delete_list;
+                       uint32_t deleted;
+                       uint32_t skipped;
+                       uint32_t error;
+                       uint32_t total;
+               } delete_queue;
+               struct {
+                       uint32_t scheduled;
+                       uint32_t skipped;
+                       uint32_t error;
+                       uint32_t total;
+               } db_traverse;
+               struct {
+                       uint32_t total;
+                       uint32_t remote_error;
+                       uint32_t local_error;
+                       uint32_t deleted;
+                       uint32_t skipped;
+                       uint32_t left;
+               } delete_list;
+               struct {
+                       uint32_t vacuumed;
+                       uint32_t copied;
+               } repack;
+       } count;
 };
 
 /* this structure contains the information for one record to be deleted */
@@ -90,13 +107,20 @@ struct delete_record_data {
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct ctdb_ltdb_header hdr;
+       uint32_t remote_fail_count;
        TDB_DATA key;
+       uint8_t keydata[1];
 };
 
 struct delete_records_list {
        struct ctdb_marshall_buffer *records;
+       struct vacuum_data *vdata;
 };
 
+static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
+                                          const struct ctdb_ltdb_header *hdr,
+                                          TDB_DATA key);
+
 /**
  * Store key and header in a tree, indexed by the key hash.
  */
@@ -108,23 +132,25 @@ static int insert_delete_record_data_into_tree(struct ctdb_context *ctdb,
 {
        struct delete_record_data *dd;
        uint32_t hash;
+       size_t len;
 
-       dd = talloc_zero(tree, struct delete_record_data);
+       len = offsetof(struct delete_record_data, keydata) + key.dsize;
+
+       dd = (struct delete_record_data *)talloc_size(tree, len);
        if (dd == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                return -1;
        }
+       talloc_set_name_const(dd, "struct delete_record_data");
 
        dd->ctdb      = ctdb;
        dd->ctdb_db   = ctdb_db;
        dd->key.dsize = key.dsize;
-       dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
-       if (dd->key.dptr == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return -1;
-       }
+       dd->key.dptr  = dd->keydata;
+       memcpy(dd->keydata, key.dptr, key.dsize);
 
        dd->hdr = *hdr;
+       dd->remote_fail_count = 0;
 
        hash = ctdb_hash(&key);
 
@@ -144,7 +170,7 @@ static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
        hash = ctdb_hash(&key);
 
        if (trbt_lookup32(vdata->delete_list, hash)) {
-               DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
+               DEBUG(DEBUG_INFO, (__location__ " Hash collision when vacuuming, skipping this record.\n"));
                return 0;
        }
 
@@ -155,7 +181,7 @@ static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
                return -1;
        }
 
-       vdata->delete_count++;
+       vdata->count.delete_list.total++;
 
        return 0;
 }
@@ -168,61 +194,64 @@ static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
                                           TDB_DATA key)
 {
        struct ctdb_context *ctdb = vdata->ctdb;
-       struct ctdb_rec_data *rec;
        uint32_t lmaster;
-       size_t old_size;
        struct ctdb_marshall_buffer *vfl;
 
        lmaster = ctdb_lmaster(ctdb, &key);
 
        vfl = vdata->vacuum_fetch_list[lmaster];
 
-       rec = ctdb_marshall_record(vfl, ctdb->pnn, key, NULL, tdb_null);
-       if (rec == NULL) {
+       vfl = ctdb_marshall_add(ctdb, vfl, vfl->db_id, ctdb->pnn,
+                               key, NULL, tdb_null);
+       if (vfl == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                vdata->traverse_error = true;
                return -1;
        }
 
-       old_size = talloc_get_size(vfl);
-       vfl = talloc_realloc_size(NULL, vfl, old_size + rec->length);
-       if (vfl == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               vdata->traverse_error = true;
-               return -1;
-       }
        vdata->vacuum_fetch_list[lmaster] = vfl;
 
-       vfl->count++;
-       memcpy(old_size+(uint8_t *)vfl, rec, rec->length);
-       talloc_free(rec);
-
-       vdata->total++;
-
        return 0;
 }
 
 
-static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
+static void ctdb_vacuum_event(struct tevent_context *ev,
+                             struct tevent_timer *te,
                              struct timeval t, void *private_data);
 
+static int vacuum_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
+{
+       struct ctdb_ltdb_header *header =
+               (struct ctdb_ltdb_header *)private_data;
+
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               return -1;
+       }
+
+       *header = *(struct ctdb_ltdb_header *)data.dptr;
+
+       return 0;
+}
 
 /*
  * traverse function for gathering the records that can be deleted
  */
-static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
+static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
+                          void *private_data)
 {
-       struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
+       struct vacuum_data *vdata = talloc_get_type(private_data,
+                                                   struct vacuum_data);
        struct ctdb_context *ctdb = vdata->ctdb;
+       struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
        uint32_t lmaster;
        struct ctdb_ltdb_header *hdr;
        int res = 0;
 
-       vdata->full_total++;
+       vdata->count.db_traverse.total++;
 
        lmaster = ctdb_lmaster(ctdb, &key);
        if (lmaster >= ctdb->num_nodes) {
-               vdata->full_error++;
+               vdata->count.db_traverse.error++;
                DEBUG(DEBUG_CRIT, (__location__
                                   " lmaster[%u] >= ctdb->num_nodes[%u] for key"
                                   " with hash[%u]!\n",
@@ -234,42 +263,29 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
                /* it is not a deleted record */
-               vdata->full_skipped++;
+               vdata->count.db_traverse.skipped++;
                return 0;
        }
 
        hdr = (struct ctdb_ltdb_header *)data.dptr;
 
        if (hdr->dmaster != ctdb->pnn) {
-               vdata->full_skipped++;
+               vdata->count.db_traverse.skipped++;
                return 0;
        }
 
-       if (lmaster == ctdb->pnn) {
-               /*
-                * We are both lmaster and dmaster, and the record is empty.
-                * So we should be able to delete it.
-                */
-               res = add_record_to_delete_list(vdata, key, hdr);
-               if (res != 0) {
-                       vdata->full_error++;
-               } else {
-                       vdata->full_added_to_delete_list++;
-               }
+       /*
+        * Add the record to this process's delete_queue for processing
+        * in the subsequent traverse in the fast vacuum run.
+        */
+       res = insert_record_into_delete_queue(ctdb_db, hdr, key);
+       if (res != 0) {
+               vdata->count.db_traverse.error++;
        } else {
-               /*
-                * We are not lmaster.
-                * Add the record to the blob ready to send to the nodes.
-                */
-               res = add_record_to_vacuum_fetch_list(vdata, key);
-               if (res != 0) {
-                       vdata->full_error++;
-               } else {
-                       vdata->full_added_to_vacuum_fetch_list++;
-               }
+               vdata->count.db_traverse.scheduled++;
        }
 
-       return res;
+       return 0;
 }
 
 /*
@@ -280,23 +296,17 @@ static int delete_marshall_traverse(void *param, void *data)
 {
        struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
        struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
-       struct ctdb_rec_data *rec;
-       size_t old_size;
+       struct ctdb_marshall_buffer *m;
 
-       rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
-       if (rec == NULL) {
+       m = ctdb_marshall_add(recs, recs->records, recs->records->db_id,
+                             recs->records->db_id,
+                             dd->key, &dd->hdr, tdb_null);
+       if (m == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
-               return 0;
+               return -1;
        }
 
-       old_size = talloc_get_size(recs->records);
-       recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
-       if (recs->records == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
-               return 0;
-       }
-       recs->records->count++;
-       memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
+       recs->records = m;
        return 0;
 }
 
@@ -328,42 +338,30 @@ static int delete_queue_traverse(void *param, void *data)
        struct ctdb_db_context *ctdb_db = dd->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb; /* or dd->ctdb ??? */
        int res;
-       struct ctdb_ltdb_header *header;
-       TDB_DATA tdb_data;
+       struct ctdb_ltdb_header header;
        uint32_t lmaster;
        uint32_t hash = ctdb_hash(&(dd->key));
 
-       vdata->fast_total++;
+       vdata->count.delete_queue.total++;
 
-       res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
+       res = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key);
        if (res != 0) {
-               DEBUG(DEBUG_ERR,
-                     (__location__ " Error getting chainlock on record with "
-                      "key hash [0x%08x] on database db[%s].\n",
-                      hash, ctdb_db->db_name));
-               vdata->fast_error++;
+               vdata->count.delete_queue.error++;
                return 0;
        }
 
-       tdb_data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
-       if (tdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               /* Does not exist or not a ctdb record. Skip. */
-               goto skipped;
-       }
-
-       if (tdb_data.dsize > sizeof(struct ctdb_ltdb_header)) {
-               /* The record has been recycled (filled with data). Skip. */
+       res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
+                              vacuum_record_parser, &header);
+       if (res != 0) {
                goto skipped;
        }
 
-       header = (struct ctdb_ltdb_header *)tdb_data.dptr;
-
-       if (header->dmaster != ctdb->pnn) {
+       if (header.dmaster != ctdb->pnn) {
                /* The record has been migrated off the node. Skip. */
                goto skipped;
        }
 
-       if (header->rsn != dd->hdr.rsn) {
+       if (header.rsn != dd->hdr.rsn) {
                /*
                 * The record has been migrated off the node and back again.
                 * But not requeued for deletion. Skip it.
@@ -389,9 +387,9 @@ static int delete_queue_traverse(void *param, void *data)
                        DEBUG(DEBUG_ERR,
                              (__location__ " Error adding record to list "
                               "of records to send to lmaster.\n"));
-                       vdata->fast_error++;
+                       vdata->count.delete_queue.error++;
                } else {
-                       vdata->fast_added_to_vacuum_fetch_list++;
+                       vdata->count.delete_queue.added_to_vacuum_fetch_list++;
                }
                goto done;
        }
@@ -404,9 +402,9 @@ static int delete_queue_traverse(void *param, void *data)
                        DEBUG(DEBUG_ERR,
                              (__location__ " Error adding record to list "
                               "of records for deletion on lmaster.\n"));
-                       vdata->fast_error++;
+                       vdata->count.delete_queue.error++;
                } else {
-                       vdata->fast_added_to_delete_list++;
+                       vdata->count.delete_queue.added_to_delete_list++;
                }
        } else {
                res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
@@ -416,25 +414,23 @@ static int delete_queue_traverse(void *param, void *data)
                              (__location__ " Error deleting record with key "
                               "hash [0x%08x] from local data base db[%s].\n",
                               hash, ctdb_db->db_name));
-                       vdata->fast_error++;
-               } else {
-                       DEBUG(DEBUG_DEBUG,
-                             (__location__ " Deleted record with key hash "
-                              "[0x%08x] from local data base db[%s].\n",
-                              hash, ctdb_db->db_name));
-                       vdata->fast_deleted++;
+                       vdata->count.delete_queue.error++;
+                       goto done;
                }
+
+               DEBUG(DEBUG_DEBUG,
+                     (__location__ " Deleted record with key hash "
+                      "[0x%08x] from local data base db[%s].\n",
+                      hash, ctdb_db->db_name));
+               vdata->count.delete_queue.deleted++;
        }
 
        goto done;
 
 skipped:
-       vdata->fast_skipped++;
+       vdata->count.delete_queue.skipped++;
 
 done:
-       if (tdb_data.dptr != NULL) {
-               free(tdb_data.dptr);
-       }
        tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
 
        return 0;
@@ -453,19 +449,26 @@ static int delete_record_traverse(void *param, void *data)
        struct ctdb_db_context *ctdb_db = dd->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int res;
-       struct ctdb_ltdb_header *header;
-       TDB_DATA tdb_data;
+       struct ctdb_ltdb_header header;
        uint32_t lmaster;
-       bool deleted = false;
        uint32_t hash = ctdb_hash(&(dd->key));
 
+       if (dd->remote_fail_count > 0) {
+               vdata->count.delete_list.remote_error++;
+               vdata->count.delete_list.left--;
+               talloc_free(dd);
+               return 0;
+       }
+
        res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
        if (res != 0) {
                DEBUG(DEBUG_ERR,
                      (__location__ " Error getting chainlock on record with "
                       "key hash [0x%08x] on database db[%s].\n",
                       hash, ctdb_db->db_name));
-               vdata->delete_local_error++;
+               vdata->count.delete_list.local_error++;
+               vdata->count.delete_list.left--;
+               talloc_free(dd);
                return 0;
        }
 
@@ -474,49 +477,49 @@ static int delete_record_traverse(void *param, void *data)
         * changed and that we are still its lmaster and dmaster.
         */
 
-       tdb_data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
-       if (tdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               /* Does not exist or not a ctdb record. Skip. */
-               vdata->delete_skipped++;
-               goto done;
-       }
-
-       if (tdb_data.dsize > sizeof(struct ctdb_ltdb_header)) {
-               /* The record has been recycled (filled with data). Skip. */
-               vdata->delete_skipped++;
-               goto done;
+       res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
+                              vacuum_record_parser, &header);
+       if (res != 0) {
+               goto skip;
        }
 
-       header = (struct ctdb_ltdb_header *)tdb_data.dptr;
-
-       if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
-         /* The record has readonly flags set. skip deleting */
-               vdata->delete_skipped++;
-               goto done;
+       if (header.flags & CTDB_REC_RO_FLAGS) {
+               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
+                                  "on database db[%s] has read-only flags. "
+                                  "skipping.\n",
+                                  hash, ctdb_db->db_name));
+               goto skip;
        }
 
-       if (header->dmaster != ctdb->pnn) {
-               /* The record has been migrated off the node. Skip. */
-               vdata->delete_skipped++;
-               goto done;
+       if (header.dmaster != ctdb->pnn) {
+               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
+                                  "on database db[%s] has been migrated away. "
+                                  "skipping.\n",
+                                  hash, ctdb_db->db_name));
+               goto skip;
        }
 
-
-       if (header->rsn != dd->hdr.rsn) {
+       if (header.rsn != dd->hdr.rsn) {
                /*
                 * The record has been migrated off the node and back again.
                 * But not requeued for deletion. Skip it.
                 */
-               vdata->delete_skipped++;
-               goto done;
+               DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
+                                  "on database db[%s] seems to have been "
+                                  "migrated away and back again (with empty "
+                                  "data). skipping.\n",
+                                  hash, ctdb_db->db_name));
+               goto skip;
        }
 
        lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
 
        if (lmaster != ctdb->pnn) {
-               /* we are not lmaster - strange */
-               vdata->delete_skipped++;
-               goto done;
+               DEBUG(DEBUG_INFO, (__location__ ": not lmaster for record in "
+                                  "delete list (key hash [0x%08x], db[%s]). "
+                                  "Strange! skipping.\n",
+                                  hash, ctdb_db->db_name));
+               goto skip;
        }
 
        res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
@@ -526,47 +529,64 @@ static int delete_record_traverse(void *param, void *data)
                      (__location__ " Error deleting record with key hash "
                       "[0x%08x] from local data base db[%s].\n",
                       hash, ctdb_db->db_name));
-               vdata->delete_local_error++;
+               vdata->count.delete_list.local_error++;
                goto done;
        }
 
-       deleted = true;
-
        DEBUG(DEBUG_DEBUG,
              (__location__ " Deleted record with key hash [0x%08x] from "
               "local data base db[%s].\n", hash, ctdb_db->db_name));
 
-done:
-       if (tdb_data.dptr != NULL) {
-               free(tdb_data.dptr);
-       }
+       vdata->count.delete_list.deleted++;
+       goto done;
+
+skip:
+       vdata->count.delete_list.skipped++;
 
+done:
        tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
 
-       if (deleted) {
-               /*
-                * successfully deleted the record locally.
-                * remove it from the list and update statistics.
-                */
-               talloc_free(dd);
-               vdata->delete_deleted++;
-               vdata->delete_left--;
-       }
+       talloc_free(dd);
+       vdata->count.delete_list.left--;
 
        return 0;
 }
 
 /**
- * Fast vacuuming run:
  * Traverse the delete_queue.
- * This fills the same lists as the database traverse.
+ * Records are either deleted directly or filled
+ * into the delete list or the vacuum fetch lists
+ * for further processing.
  */
-static void ctdb_vacuum_db_fast(struct ctdb_db_context *ctdb_db,
-                               struct vacuum_data *vdata)
+static void ctdb_process_delete_queue(struct ctdb_db_context *ctdb_db,
+                                     struct vacuum_data *vdata)
 {
-       trbt_traversearray32(ctdb_db->delete_queue, 1, delete_queue_traverse, vdata);
+       uint32_t sum;
+       int ret;
+
+       ret = trbt_traversearray32(ctdb_db->delete_queue, 1,
+                                  delete_queue_traverse, vdata);
+
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing "
+                     "the delete queue.\n"));
+       }
 
-       if (vdata->fast_total > 0) {
+       sum = vdata->count.delete_queue.deleted
+           + vdata->count.delete_queue.skipped
+           + vdata->count.delete_queue.error
+           + vdata->count.delete_queue.added_to_delete_list
+           + vdata->count.delete_queue.added_to_vacuum_fetch_list;
+
+       if (vdata->count.delete_queue.total != sum) {
+               DEBUG(DEBUG_ERR, (__location__ " Inconsistency in fast vacuum "
+                     "counts for db[%s]: total[%u] != sum[%u]\n",
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_queue.total,
+                     (unsigned)sum));
+       }
+
+       if (vdata->count.delete_queue.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " fast vacuuming delete_queue traverse statistics: "
@@ -578,43 +598,37 @@ static void ctdb_vacuum_db_fast(struct ctdb_db_context *ctdb_db,
                       "adl[%u] "
                       "avf[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->fast_total,
-                      (unsigned)vdata->fast_deleted,
-                      (unsigned)vdata->fast_skipped,
-                      (unsigned)vdata->fast_error,
-                      (unsigned)vdata->fast_added_to_delete_list,
-                      (unsigned)vdata->fast_added_to_vacuum_fetch_list));
+                      (unsigned)vdata->count.delete_queue.total,
+                      (unsigned)vdata->count.delete_queue.deleted,
+                      (unsigned)vdata->count.delete_queue.skipped,
+                      (unsigned)vdata->count.delete_queue.error,
+                      (unsigned)vdata->count.delete_queue.added_to_delete_list,
+                      (unsigned)vdata->count.delete_queue.added_to_vacuum_fetch_list));
        }
 
        return;
 }
 
 /**
- * Full vacuum run:
  * read-only traverse of the database, looking for records that
  * might be able to be vacuumed.
  *
  * This is not done each time but only every tunable
  * VacuumFastPathCount times.
  */
-static int ctdb_vacuum_db_full(struct ctdb_db_context *ctdb_db,
-                              struct vacuum_data *vdata,
-                              bool full_vacuum_run)
+static void ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
+                                   struct vacuum_data *vdata)
 {
        int ret;
 
-       if (!full_vacuum_run) {
-               return 0;
-       }
-
        ret = tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata);
        if (ret == -1 || vdata->traverse_error) {
                DEBUG(DEBUG_ERR, (__location__ " Traverse error in vacuuming "
                                  "'%s'\n", ctdb_db->db_name));
-               return -1;
+               return;
        }
 
-       if (vdata->full_total > 0) {
+       if (vdata->count.db_traverse.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " full vacuuming db traverse statistics: "
@@ -622,17 +636,15 @@ static int ctdb_vacuum_db_full(struct ctdb_db_context *ctdb_db,
                       "total[%u] "
                       "skp[%u] "
                       "err[%u] "
-                      "adl[%u] "
-                      "avf[%u]\n",
+                      "sched[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->full_total,
-                      (unsigned)vdata->full_skipped,
-                      (unsigned)vdata->full_error,
-                      (unsigned)vdata->full_added_to_delete_list,
-                      (unsigned)vdata->full_added_to_vacuum_fetch_list));
+                      (unsigned)vdata->count.db_traverse.total,
+                      (unsigned)vdata->count.db_traverse.skipped,
+                      (unsigned)vdata->count.db_traverse.error,
+                      (unsigned)vdata->count.db_traverse.scheduled));
        }
 
-       return 0;
+       return;
 }
 
 /**
@@ -640,8 +652,8 @@ static int ctdb_vacuum_db_full(struct ctdb_db_context *ctdb_db,
  * For records for which we are not the lmaster, tell the lmaster to
  * fetch the record.
  */
-static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
-                                          struct vacuum_data *vdata)
+static void ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
+                                           struct vacuum_data *vdata)
 {
        int i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
@@ -662,8 +674,7 @@ static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
                                   vfl->count, ctdb->nodes[i]->pnn,
                                   ctdb_db->db_name));
 
-               data.dsize = talloc_get_size(vfl);
-               data.dptr  = (void *)vfl;
+               data = ctdb_marshall_finish(vfl);
                if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn,
                                             CTDB_SRVID_VACUUM_FETCH,
                                             data) != 0)
@@ -671,212 +682,284 @@ static int ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
                        DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
                                          "fetch message to %u\n",
                                          ctdb->nodes[i]->pnn));
-                       return -1;
                }
        }
 
-       return 0;
+       return;
 }
 
 /**
- * Proces the delete list:
- * Send the records to delete to all other nodes with the
- * try_delete_records control.
+ * Process the delete list:
+ *
+ * This is the last step of vacuuming that consistently deletes
+ * those records that have been migrated with data and can hence
+ * not be deleted when leaving a node.
+ *
+ * In this step, the lmaster does the final deletion of those empty
+ * records that it is also dmaster for. It has ususally received
+ * at least some of these records previously from the former dmasters
+ * with the vacuum fetch message.
+ *
+ *  1) Send the records to all active nodes with the TRY_DELETE_RECORDS
+ *     control. The remote notes delete their local copy.
+ *  2) The lmaster locally deletes its copies of all records that
+ *     could successfully be deleted remotely in step #2.
  */
-static int ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
-                                   struct vacuum_data *vdata)
+static void ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
+                                    struct vacuum_data *vdata)
 {
        int ret, i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct delete_records_list *recs;
+       TDB_DATA indata;
+       struct ctdb_node_map_old *nodemap;
+       uint32_t *active_nodes;
+       int num_active_nodes;
+       TALLOC_CTX *tmp_ctx;
+       uint32_t sum;
 
-       vdata->delete_left = vdata->delete_count;
+       if (vdata->count.delete_list.total == 0) {
+               return;
+       }
 
-       if (vdata->delete_count > 0) {
-               struct delete_records_list *recs;
-               TDB_DATA indata, outdata;
-               int32_t res;
-               struct ctdb_node_map *nodemap;
-               uint32_t *active_nodes;
-               int num_active_nodes;
+       tmp_ctx = talloc_new(vdata);
+       if (tmp_ctx == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               return;
+       }
 
-               recs = talloc_zero(vdata, struct delete_records_list);
-               if (recs == NULL) {
-                       DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-                       return -1;
-               }
-               recs->records = (struct ctdb_marshall_buffer *)
-                       talloc_zero_size(vdata, 
-                                   offsetof(struct ctdb_marshall_buffer, data));
-               if (recs->records == NULL) {
-                       DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-                       return -1;
-               }
-               recs->records->db_id = ctdb_db->db_id;
+       vdata->count.delete_list.left = vdata->count.delete_list.total;
 
-               /* 
-                * traverse the tree of all records we want to delete and
-                * create a blob we can send to the other nodes.
-                */
-               trbt_traversearray32(vdata->delete_list, 1,
-                                    delete_marshall_traverse, recs);
+       /*
+        * get the list of currently active nodes
+        */
 
-               indata.dsize = talloc_get_size(recs->records);
-               indata.dptr  = (void *)recs->records;
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
+                                  CTDB_CURRENT_NODE,
+                                  tmp_ctx,
+                                  &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
+               goto done;
+       }
 
-               /* 
-                * now tell all the active nodes to delete all these records
-                * (if possible)
-                */
+       active_nodes = list_of_active_nodes(ctdb, nodemap,
+                                           nodemap, /* talloc context */
+                                           false /* include self */);
+       /* yuck! ;-) */
+       num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
+
+       /*
+        * Now delete the records all active nodes in a two-phase process:
+        * 1) tell all active remote nodes to delete all their copy
+        * 2) if all remote nodes deleted their record copy, delete it locally
+        */
+
+       recs = talloc_zero(tmp_ctx, struct delete_records_list);
+       if (recs == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               goto done;
+       }
 
-               ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
-                                          CTDB_CURRENT_NODE,
-                                          recs, /* talloc context */
-                                          &nodemap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
-                       return -1;
+       /*
+        * Step 1:
+        * Send all records to all active nodes for deletion.
+        */
+
+       /*
+        * Create a marshall blob from the remaining list of records to delete.
+        */
+
+       recs->records = (struct ctdb_marshall_buffer *)
+               talloc_zero_size(recs,
+                                offsetof(struct ctdb_marshall_buffer, data));
+       if (recs->records == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               goto done;
+       }
+       recs->records->db_id = ctdb_db->db_id;
+
+       ret = trbt_traversearray32(vdata->delete_list, 1,
+                                  delete_marshall_traverse, recs);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
+                     "delete list for second marshalling.\n"));
+               goto done;
+       }
+
+       indata = ctdb_marshall_finish(recs->records);
+
+       for (i = 0; i < num_active_nodes; i++) {
+               struct ctdb_marshall_buffer *records;
+               struct ctdb_rec_data_old *rec;
+               int32_t res;
+               TDB_DATA outdata;
+
+               ret = ctdb_control(ctdb, active_nodes[i], 0,
+                               CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
+                               indata, recs, &outdata, &res,
+                               NULL, NULL);
+               if (ret != 0 || res != 0) {
+                       DEBUG(DEBUG_ERR, ("Failed to delete records on "
+                                         "node %u: ret[%d] res[%d]\n",
+                                         active_nodes[i], ret, res));
+                       goto done;
                }
 
-               active_nodes = list_of_active_nodes(ctdb, nodemap,
-                                                   nodemap, /* talloc context */
-                                                   false /* include self */);
-               /* yuck! ;-) */
-               num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
-
-               for (i = 0; i < num_active_nodes; i++) {
-                       struct ctdb_marshall_buffer *records;
-                       struct ctdb_rec_data *rec;
-
-                       ret = ctdb_control(ctdb, active_nodes[i], 0,
-                                       CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
-                                       indata, recs, &outdata, &res,
-                                       NULL, NULL);
-                       if (ret != 0 || res != 0) {
-                               DEBUG(DEBUG_ERR, ("Failed to delete records on "
-                                                 "node %u: ret[%d] res[%d]\n",
-                                                 active_nodes[i], ret, res));
-                               return -1;
+               /*
+                * outdata contains the list of records coming back
+                * from the node: These are the records that the
+                * remote node could not delete. We remove these from
+                * the list to delete locally.
+                */
+               records = (struct ctdb_marshall_buffer *)outdata.dptr;
+               rec = (struct ctdb_rec_data_old *)&records->data[0];
+               while (records->count-- > 1) {
+                       TDB_DATA reckey, recdata;
+                       struct ctdb_ltdb_header *rechdr;
+                       struct delete_record_data *dd;
+
+                       reckey.dptr = &rec->data[0];
+                       reckey.dsize = rec->keylen;
+                       recdata.dptr = &rec->data[reckey.dsize];
+                       recdata.dsize = rec->datalen;
+
+                       if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
+                               DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+                               goto done;
                        }
-
-                       /*
-                        * outdata contains the list of records coming back
-                        * from the node: These are the records that the
-                        * remote node could not delete.
-                        *
-                        * NOTE: There is a problem here:
-                        *
-                        * When a node failed to delete the record, but
-                        * others succeeded, we may have created gaps in the
-                        * history of the record. Hence when a node dies, an
-                        * closed file handle might be resurrected or an open
-                        * file handle might be lost, leading to blocked access
-                        * or data corruption.
-                        *
-                        * TODO: This needs to be fixed!
-                        */
-                       records = (struct ctdb_marshall_buffer *)outdata.dptr;
-                       rec = (struct ctdb_rec_data *)&records->data[0];
-                       while (records->count-- > 1) {
-                               TDB_DATA reckey, recdata;
-                               struct ctdb_ltdb_header *rechdr;
-                               struct delete_record_data *dd;
-
-                               reckey.dptr = &rec->data[0];
-                               reckey.dsize = rec->keylen;
-                               recdata.dptr = &rec->data[reckey.dsize];
-                               recdata.dsize = rec->datalen;
-
-                               if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
-                                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                                       return -1;
-                               }
-                               rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
-                               recdata.dptr += sizeof(*rechdr);
-                               recdata.dsize -= sizeof(*rechdr);
-
-                               dd = (struct delete_record_data *)trbt_lookup32(
-                                               vdata->delete_list,
-                                               ctdb_hash(&reckey));
-                               if (dd != NULL) {
-                                       /*
-                                        * The other node could not delete the
-                                        * record and it is the first node that
-                                        * failed. So we should remove it from
-                                        * the tree and update statistics.
-                                        */
-                                       talloc_free(dd);
-                                       vdata->delete_remote_error++;
-                                       vdata->delete_left--;
-                               }
-
-                               rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+                       rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
+                       recdata.dptr += sizeof(*rechdr);
+                       recdata.dsize -= sizeof(*rechdr);
+
+                       dd = (struct delete_record_data *)trbt_lookup32(
+                                       vdata->delete_list,
+                                       ctdb_hash(&reckey));
+                       if (dd != NULL) {
+                               /*
+                                * The remote node could not delete the
+                                * record.  Since other remote nodes can
+                                * also fail, we just mark the record.
+                                */
+                               dd->remote_fail_count++;
+                       } else {
+                               DEBUG(DEBUG_ERR, (__location__ " Failed to "
+                                     "find record with hash 0x%08x coming "
+                                     "back from TRY_DELETE_RECORDS "
+                                     "control in delete list.\n",
+                                     ctdb_hash(&reckey)));
                        }
+
+                       rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
                }
+       }
+
+       /*
+        * Step 2:
+        * Delete the remaining records locally.
+        *
+        * These records have successfully been deleted on all
+        * active remote nodes.
+        */
 
-               /* free nodemap and active_nodes */
-               talloc_free(nodemap);
+       ret = trbt_traversearray32(vdata->delete_list, 1,
+                                  delete_record_traverse, vdata);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
+                     "delete list for deletion.\n"));
        }
 
-       if (vdata->delete_left > 0) {
-               /*
-                * The only records remaining in the tree are those
-                * records which all other nodes could successfully
-                * delete, so we can safely delete them on the
-                * lmaster as well.
-                */
-               trbt_traversearray32(vdata->delete_list, 1,
-                                    delete_record_traverse, vdata);
+       if (vdata->count.delete_list.left != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Vaccum db[%s] error: "
+                     "there are %u records left for deletion after "
+                     "processing delete list\n",
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_list.left));
        }
 
-       if (vdata->delete_count > 0) {
+       sum = vdata->count.delete_list.deleted
+           + vdata->count.delete_list.skipped
+           + vdata->count.delete_list.remote_error
+           + vdata->count.delete_list.local_error
+           + vdata->count.delete_list.left;
+
+       if (vdata->count.delete_list.total != sum) {
+               DEBUG(DEBUG_ERR, (__location__ " Inconsistency in vacuum "
+                     "delete list counts for db[%s]: total[%u] != sum[%u]\n",
+                     ctdb_db->db_name,
+                     (unsigned)vdata->count.delete_list.total,
+                     (unsigned)sum));
+       }
+
+       if (vdata->count.delete_list.total > 0) {
                DEBUG(DEBUG_INFO,
                      (__location__
                       " vacuum delete list statistics: "
                       "db[%s] "
-                      "coll[%u] "
+                      "total[%u] "
+                      "del[%u] "
+                      "skip[%u] "
                       "rem.err[%u] "
                       "loc.err[%u] "
-                      "skip[%u] "
-                      "del[%u] "
                       "left[%u]\n",
                       ctdb_db->db_name,
-                      (unsigned)vdata->delete_count,
-                      (unsigned)vdata->delete_remote_error,
-                      (unsigned)vdata->delete_local_error,
-                      (unsigned)vdata->delete_skipped,
-                      (unsigned)vdata->delete_deleted,
-                      (unsigned)vdata->delete_left));
+                      (unsigned)vdata->count.delete_list.total,
+                      (unsigned)vdata->count.delete_list.deleted,
+                      (unsigned)vdata->count.delete_list.skipped,
+                      (unsigned)vdata->count.delete_list.remote_error,
+                      (unsigned)vdata->count.delete_list.local_error,
+                      (unsigned)vdata->count.delete_list.left));
        }
 
-       return 0;
+done:
+       talloc_free(tmp_ctx);
+
+       return;
 }
 
 /**
  * initialize the vacuum_data
  */
-static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
-                                       struct vacuum_data *vdata)
+static struct vacuum_data *ctdb_vacuum_init_vacuum_data(
+                                       struct ctdb_db_context *ctdb_db,
+                                       TALLOC_CTX *mem_ctx)
 {
        int i;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct vacuum_data *vdata;
 
-       vdata->fast_added_to_delete_list = 0;
-       vdata->fast_added_to_vacuum_fetch_list = 0;
-       vdata->fast_deleted = 0;
-       vdata->fast_skipped = 0;
-       vdata->fast_error = 0;
-       vdata->fast_total = 0;
-       vdata->full_added_to_delete_list = 0;
-       vdata->full_added_to_vacuum_fetch_list = 0;
-       vdata->full_skipped = 0;
-       vdata->full_error = 0;
-       vdata->full_total = 0;
-       vdata->delete_count = 0;
-       vdata->delete_left = 0;
-       vdata->delete_remote_error = 0;
-       vdata->delete_local_error = 0;
-       vdata->delete_skipped = 0;
-       vdata->delete_deleted = 0;
+       vdata = talloc_zero(mem_ctx, struct vacuum_data);
+       if (vdata == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               return NULL;
+       }
+
+       vdata->ctdb = ctdb_db->ctdb;
+       vdata->ctdb_db = ctdb_db;
+       vdata->delete_list = trbt_create(vdata, 0);
+       if (vdata->delete_list == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               goto fail;
+       }
+
+       vdata->start = timeval_current();
+
+       vdata->count.delete_queue.added_to_delete_list = 0;
+       vdata->count.delete_queue.added_to_vacuum_fetch_list = 0;
+       vdata->count.delete_queue.deleted = 0;
+       vdata->count.delete_queue.skipped = 0;
+       vdata->count.delete_queue.error = 0;
+       vdata->count.delete_queue.total = 0;
+       vdata->count.db_traverse.scheduled = 0;
+       vdata->count.db_traverse.skipped = 0;
+       vdata->count.db_traverse.error = 0;
+       vdata->count.db_traverse.total = 0;
+       vdata->count.delete_list.total = 0;
+       vdata->count.delete_list.left = 0;
+       vdata->count.delete_list.remote_error = 0;
+       vdata->count.delete_list.local_error = 0;
+       vdata->count.delete_list.skipped = 0;
+       vdata->count.delete_list.deleted = 0;
 
        /* the list needs to be of length num_nodes */
        vdata->vacuum_fetch_list = talloc_zero_array(vdata,
@@ -884,7 +967,7 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
                                                ctdb->num_nodes);
        if (vdata->vacuum_fetch_list == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return -1;
+               goto fail;
        }
        for (i = 0; i < ctdb->num_nodes; i++) {
                vdata->vacuum_fetch_list[i] = (struct ctdb_marshall_buffer *)
@@ -892,12 +975,17 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
                                         offsetof(struct ctdb_marshall_buffer, data));
                if (vdata->vacuum_fetch_list[i] == NULL) {
                        DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-                       return -1;
+                       talloc_free(vdata);
+                       return NULL;
                }
                vdata->vacuum_fetch_list[i]->db_id = ctdb_db->db_id;
        }
 
-       return 0;
+       return vdata;
+
+fail:
+       talloc_free(vdata);
+       return NULL;
 }
 
 /**
@@ -933,11 +1021,12 @@ static int ctdb_vacuum_init_vacuum_data(struct ctdb_db_context *ctdb_db,
  * This executes in the child context.
  */
 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
-                         struct vacuum_data *vdata,
                          bool full_vacuum_run)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int ret, pnn;
+       struct vacuum_data *vdata;
+       TALLOC_CTX *tmp_ctx;
 
        DEBUG(DEBUG_INFO, (__location__ " Entering %s vacuum run for db "
                           "%s db_id[0x%08x]\n",
@@ -958,156 +1047,32 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
 
        ctdb->pnn = pnn;
 
-       ret = ctdb_vacuum_init_vacuum_data(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       ctdb_vacuum_db_fast(ctdb_db, vdata);
-
-       ret = ctdb_vacuum_db_full(ctdb_db, vdata, full_vacuum_run);
-       if (ret != 0) {
-               return ret;
-       }
-
-       ret = ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       ret = ctdb_process_delete_list(ctdb_db, vdata);
-       if (ret != 0) {
-               return ret;
-       }
-
-       /* this ensures we run our event queue */
-       ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-
-       return 0;
-}
-
-
-/*
- * traverse function for repacking
- */
-static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
-{
-       struct vacuum_data *vdata = (struct vacuum_data *)private;
-
-       if (vdata->vacuum) {
-               uint32_t hash = ctdb_hash(&key);
-               struct delete_record_data *kd;
-               /*
-                * check if we can ignore this record because it's in the delete_list
-                */
-               kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_list, hash);
-               /*
-                * there might be hash collisions so we have to compare the keys here to be sure
-                */
-               if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
-                       struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
-                       /*
-                        * we have to check if the record hasn't changed in the meantime in order to
-                        * savely remove it from the database
-                        */
-                       if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
-                               hdr->dmaster == kd->ctdb->pnn &&
-                               ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
-                               kd->hdr.rsn == hdr->rsn) {
-                               vdata->vacuumed++;
-                               return 0;
-                       }
-               }
-       }
-       if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
-               vdata->traverse_error = true;
-               return -1;
-       }
-       vdata->copied++;
-       return 0;
-}
-
-/*
- * repack a tdb
- */
-static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
-{
-       struct tdb_context *tmp_db;
-
-       if (tdb_transaction_start(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
-               return -1;
-       }
-
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
-                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
-                         O_RDWR|O_CREAT, 0);
-       if (tmp_db == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
-               tdb_transaction_cancel(tdb);
+       tmp_ctx = talloc_new(ctdb_db);
+       if (tmp_ctx == NULL) {
+               DEBUG(DEBUG_ERR, ("Out of memory!\n"));
                return -1;
        }
 
-       vdata->traverse_error = false;
-       vdata->dest_db = tmp_db;
-       vdata->vacuum = true;
-       vdata->vacuumed = 0;
-       vdata->copied = 0;
-
-       /*
-        * repack and vacuum on-the-fly by not writing the records that are
-        * no longer needed
-        */
-       if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;              
-       }
-
-       DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
-       
-       if (vdata->traverse_error) {
-               DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
+       vdata = ctdb_vacuum_init_vacuum_data(ctdb_db, tmp_ctx);
+       if (vdata == NULL) {
+               talloc_free(tmp_ctx);
                return -1;
        }
 
-       if (tdb_wipe_all(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;
+       if (full_vacuum_run) {
+               ctdb_vacuum_traverse_db(ctdb_db, vdata);
        }
 
-       vdata->traverse_error = false;
-       vdata->dest_db = tdb;
-       vdata->vacuum = false;
-       vdata->copied = 0;
+       ctdb_process_delete_queue(ctdb_db, vdata);
 
-       if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;              
-       }
+       ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
 
-       if (vdata->traverse_error) {
-               DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
-               tdb_transaction_cancel(tdb);
-               tdb_close(tmp_db);
-               return -1;
-       }
+       ctdb_process_delete_list(ctdb_db, vdata);
 
-       tdb_close(tmp_db);
+       talloc_free(tmp_ctx);
 
-
-       if (tdb_transaction_commit(tdb) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
-               return -1;
-       }
-       DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
+       /* this ensures we run our event queue */
+       ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
 
        return 0;
 }
@@ -1117,14 +1082,16 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
  * called from the child context
  */
 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
-                                    TALLOC_CTX *mem_ctx,
                                     bool full_vacuum_run)
 {
        uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
-       uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
        const char *name = ctdb_db->db_name;
-       int freelist_size;
-       struct vacuum_data *vdata;
+       int freelist_size = 0;
+       int ret;
+
+       if (ctdb_vacuum_db(ctdb_db, full_vacuum_run) != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
+       }
 
        freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
        if (freelist_size == -1) {
@@ -1132,53 +1099,22 @@ static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
                return -1;
        }
 
-       vdata = talloc_zero(mem_ctx, struct vacuum_data);
-       if (vdata == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               return -1;
-       }
-
-       vdata->ctdb = ctdb_db->ctdb;
-       vdata->vacuum_limit = vacuum_limit;
-       vdata->repack_limit = repack_limit;
-       vdata->delete_list = trbt_create(vdata, 0);
-       vdata->ctdb_db = ctdb_db;
-       if (vdata->delete_list == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
-               talloc_free(vdata);
-               return -1;
-       }
-
-       vdata->start = timeval_current();
-       /*
-        * gather all records that can be deleted in vdata
-        */
-       if (ctdb_vacuum_db(ctdb_db, vdata, full_vacuum_run) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
-       }
-
        /*
         * decide if a repack is necessary
         */
-       if (freelist_size < repack_limit && vdata->delete_left < vacuum_limit)
+       if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit))
        {
-               talloc_free(vdata);
                return 0;
        }
 
-       DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
-                       name, freelist_size, vdata->delete_left));
+       DEBUG(DEBUG_INFO, ("Repacking %s with %u freelist entries\n",
+                          name, freelist_size));
 
-       /*
-        * repack and implicitely get rid of the records we can delete
-        */
-       if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
+       ret = tdb_repack(ctdb_db->ltdb->tdb);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
-               talloc_free(vdata);
                return -1;
        }
-       talloc_free(vdata);
 
        return 0;
 }
@@ -1196,6 +1132,7 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
        struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
+       CTDB_UPDATE_DB_LATENCY(ctdb_db, "vacuum", vacuum.latency, l);
        DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
 
        if (child_ctx->child_pid != -1) {
@@ -1207,9 +1144,9 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
 
        DLIST_REMOVE(ctdb->vacuumers, child_ctx);
 
-       event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
-                       ctdb_vacuum_event, child_ctx->vacuum_handle);
+       tevent_add_timer(ctdb->ev, child_ctx->vacuum_handle,
+                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                        ctdb_vacuum_event, child_ctx->vacuum_handle);
 
        return 0;
 }
@@ -1217,8 +1154,9 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
 /*
  * this event is generated when a vacuum child process times out
  */
-static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
-                                        struct timeval t, void *private_data)
+static void vacuum_child_timeout(struct tevent_context *ev,
+                                struct tevent_timer *te,
+                                struct timeval t, void *private_data)
 {
        struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
 
@@ -1233,8 +1171,9 @@ static void vacuum_child_timeout(struct event_context *ev, struct timed_event *t
 /*
  * this event is generated when a vacuum child process has completed
  */
-static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
-                            uint16_t flags, void *private_data)
+static void vacuum_child_handler(struct tevent_context *ev,
+                                struct tevent_fd *fde,
+                                uint16_t flags, void *private_data)
 {
        struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
        char c = 0;
@@ -1243,7 +1182,7 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
        DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
        child_ctx->child_pid = -1;
 
-       ret = read(child_ctx->fd[0], &c, 1);
+       ret = sys_read(child_ctx->fd[0], &c, 1);
        if (ret != 1 || c != 0) {
                child_ctx->status = VACUUM_ERROR;
                DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
@@ -1257,28 +1196,38 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
 /*
  * this event is called every time we need to start a new vacuum process
  */
-static void
-ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
-                              struct timeval t, void *private_data)
+static void ctdb_vacuum_event(struct tevent_context *ev,
+                             struct tevent_timer *te,
+                             struct timeval t, void *private_data)
 {
        struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
        struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_vacuum_child_context *child_ctx;
        struct tevent_fd *fde;
+       bool full_vacuum_run = false;
        int ret;
 
-       /* we dont vacuum if we are in recovery mode, or db frozen */
+       /* we don't vacuum if we are in recovery mode, or db frozen */
        if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
-           ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
+           ctdb_db_frozen(ctdb_db)) {
                DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
-                                  ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
-                                  : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
-                                  ? "freeze pending"
-                                  : "frozen"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+                                  ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ?
+                                       "in recovery" : "frozen"));
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
+       /* Do not allow multiple vacuuming child processes to be active at the
+        * same time.  If there is vacuuming child process active, delay
+        * new vacuuming event to stagger vacuuming events.
+        */
+       if (ctdb->vacuumers != NULL) {
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(0, 500*1000),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
@@ -1293,13 +1242,17 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        if (ret != 0) {
                talloc_free(child_ctx);
                DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
-       if (vacuum_handle->fast_path_count > ctdb->tunable.vacuum_fast_path_count) {
+       if (vacuum_handle->fast_path_count >=
+           ctdb->tunable.vacuum_fast_path_count) {
+               if (ctdb->tunable.vacuum_fast_path_count > 0) {
+                       full_vacuum_run = true;
+               }
                vacuum_handle->fast_path_count = 0;
        }
 
@@ -1309,37 +1262,27 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                close(child_ctx->fd[1]);
                talloc_free(child_ctx);
                DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
-               event_add_timed(ctdb->ev, vacuum_handle,
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
-                       ctdb_vacuum_event, vacuum_handle);
+               tevent_add_timer(ctdb->ev, vacuum_handle,
+                                timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                                ctdb_vacuum_event, vacuum_handle);
                return;
        }
 
 
        if (child_ctx->child_pid == 0) {
                char cc = 0;
-               bool full_vacuum_run = false;
                close(child_ctx->fd[0]);
 
                DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
-       
-               if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
+               prctl_set_comment("ctdb_vacuum");
+               if (switch_from_server_to_client(ctdb) != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
                        _exit(1);
                }
 
-               /* 
-                * repack the db
-                */
-               if ((ctdb->tunable.vacuum_fast_path_count > 0) &&
-                   (vacuum_handle->fast_path_count == 0))
-               {
-                       full_vacuum_run = true;
-               }
-               cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx,
-                                              full_vacuum_run);
+               cc = ctdb_vacuum_and_repack_db(ctdb_db, full_vacuum_run);
 
-               write(child_ctx->fd[1], &cc, 1);
+               sys_write(child_ctx->fd[1], &cc, 1);
                _exit(0);
        }
 
@@ -1363,14 +1306,14 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                                 "in parent context. Shutting down\n");
        }
 
-       event_add_timed(ctdb->ev, child_ctx,
-               timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
-               vacuum_child_timeout, child_ctx);
+       tevent_add_timer(ctdb->ev, child_ctx,
+                        timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
+                        vacuum_child_timeout, child_ctx);
 
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
 
-       fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
-                          EVENT_FD_READ, vacuum_child_handler, child_ctx);
+       fde = tevent_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
+                           TEVENT_FD_READ, vacuum_child_handler, child_ctx);
        tevent_fd_set_auto_close(fde);
 
        vacuum_handle->child_ctx = child_ctx;
@@ -1394,8 +1337,10 @@ void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
  */
 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 {
-       if (ctdb_db->persistent != 0) {
-               DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
+       if (! ctdb_db_volatile(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Vacuuming is disabled for non-volatile database %s\n",
+                      ctdb_db->db_name));
                return 0;
        }
 
@@ -1405,9 +1350,9 @@ int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
        ctdb_db->vacuum_handle->ctdb_db         = ctdb_db;
        ctdb_db->vacuum_handle->fast_path_count = 0;
 
-       event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
-                       timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
-                       ctdb_vacuum_event, ctdb_db->vacuum_handle);
+       tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
+                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
+                        ctdb_vacuum_event, ctdb_db->vacuum_handle);
 
        return 0;
 }
@@ -1477,11 +1422,11 @@ static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
 
        hash = (uint32_t)ctdb_hash(&key);
 
-       DEBUG(DEBUG_INFO, (__location__ " schedule for deletion: db[%s] "
-                          "db_id[0x%08x] "
-                          "key_hash[0x%08x] "
-                          "lmaster[%u] "
-                          "migrated_with_data[%s]\n",
+       DEBUG(DEBUG_DEBUG, (__location__ " schedule for deletion: db[%s] "
+                           "db_id[0x%08x] "
+                           "key_hash[0x%08x] "
+                           "lmaster[%u] "
+                           "migrated_with_data[%s]\n",
                            ctdb_db->db_name, ctdb_db->db_id,
                            hash,
                            ctdb_lmaster(ctdb_db->ctdb, &key),
@@ -1564,7 +1509,7 @@ int32_t ctdb_local_schedule_for_deletion(struct ctdb_db_context *ctdb_db,
                return ret;
        }
 
-       /* if we dont have a connection to the daemon we can not send
+       /* if we don't have a connection to the daemon we can not send
           a control. For example sometimes from update_record control child
           process.
        */