added two new ctdb commands:
authorAndrew Tridgell <tridge@samba.org>
Tue, 8 Jan 2008 06:23:27 +0000 (17:23 +1100)
committerAndrew Tridgell <tridge@samba.org>
Tue, 8 Jan 2008 06:23:27 +0000 (17:23 +1100)
 ctdb vacuum   : vacuums all the databases, deleting any zero length
                 ctdb records

 ctdb repack   : repacks all the databases, resulting in a perfectly
                 packed database with no freelist entries

Makefile.in
client/ctdb_client.c
include/ctdb.h
include/ctdb_private.h
lib/tdb/common/freelist.c
lib/tdb/include/tdb.h
server/ctdb_control.c
server/ctdb_recover.c
server/ctdb_recoverd.c
tools/ctdb.c
tools/ctdb_vacuum.c [new file with mode: 0644]

index 0a2e58e5a9454cf0c34ad9e57df71cc5de5a7892..3e2b1e1b192666b042b6bf2688845630c077bc83 100644 (file)
@@ -90,9 +90,9 @@ bin/ctdb_ipmux: $(CTDB_CLIENT_OBJ) utils/ipmux/ipmux.o
        @echo Linking $@
        @$(CC) $(CFLAGS) -o $@ utils/ipmux/ipmux.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS) $(IPQ_LIBS)
 
-bin/ctdb: $(CTDB_CLIENT_OBJ) tools/ctdb.o 
+bin/ctdb: $(CTDB_CLIENT_OBJ) tools/ctdb.o tools/ctdb_vacuum.o
        @echo Linking $@
-       @$(CC) $(CFLAGS) -o $@ tools/ctdb.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
+       @$(CC) $(CFLAGS) -o $@ tools/ctdb.o tools/ctdb_vacuum.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
 
 bin/smnotify: utils/smnotify/gen_xdr.o utils/smnotify/gen_smnotify.o utils/smnotify/smnotify.o 
        @echo Linking $@
index 20d75d1373eadbd2f159b2ee348cf81983b784db..a6336f9c84bd248a25b70cfa415085a256cae1c4 100644 (file)
@@ -155,6 +155,9 @@ struct ctdb_client_call_state {
        uint32_t reqid;
        struct ctdb_db_context *ctdb_db;
        struct ctdb_call call;
+       struct {
+               void (*fn)(struct ctdb_client_call_state *);
+       } async;
 };
 
 /*
@@ -187,6 +190,10 @@ static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_he
        talloc_steal(state, c);
 
        state->state = CTDB_CALL_DONE;
+
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
 }
 
 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
@@ -377,7 +384,8 @@ static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db
   This call never blocks.
 */
 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
-                                      struct ctdb_call *call)
+                                             struct ctdb_call *call, 
+                                             void (*callback)(struct ctdb_client_call_state *))
 {
        struct ctdb_client_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
@@ -404,6 +412,9 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
                state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
                talloc_free(data.dptr);
                ctdb_ltdb_unlock(ctdb_db, call->key);
+               if (state) {
+                       state->async.fn = callback;
+               }
                return state;
        }
 
@@ -446,6 +457,8 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
 
        ctdb_client_queue_pkt(ctdb, &c->hdr);
 
+       state->async.fn = callback;
+
        return state;
 }
 
@@ -457,7 +470,7 @@ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
 {
        struct ctdb_client_call_state *state;
 
-       state = ctdb_call_send(ctdb_db, call);
+       state = ctdb_call_send(ctdb_db, call, NULL);
        return ctdb_call_recv(state, call);
 }
 
@@ -1575,6 +1588,22 @@ int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
        return 0;
 }
 
+/*
+  this is the dummy null procedure that all databases support
+*/
+static int ctdb_null_func(struct ctdb_call_info *call)
+{
+       return 0;
+}
+
+/*
+  this is a plain fetch procedure that all databases support
+*/
+static int ctdb_fetch_func(struct ctdb_call_info *call)
+{
+       call->reply_data = &call->record_data;
+       return 0;
+}
 
 /*
   attach to a specific database - client call
@@ -1632,6 +1661,10 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
 
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
+       /* add well known functions */
+       ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
+       ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
+
        return ctdb_db;
 }
 
@@ -1641,12 +1674,15 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
  */
 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
 {
+       struct ctdb_registered_call *call;
+
+#if 0
        TDB_DATA data;
        int32_t status;
        struct ctdb_control_set_call c;
        int ret;
-       struct ctdb_registered_call *call;
 
+       /* this is no longer valid with the separate daemon architecture */
        c.db_id = ctdb_db->db_id;
        c.fn    = fn;
        c.id    = id;
@@ -1660,6 +1696,7 @@ int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
                DEBUG(0,("ctdb_set_call failed for call %u\n", id));
                return -1;
        }
+#endif
 
        /* also register locally */
        call = talloc(ctdb_db, struct ctdb_registered_call);
index 14f75b4c822878be71569d84e5c8c91b6cc6d164..ed38535224b648d5554e4e1f5ae77cc266b1e836 100644 (file)
@@ -85,6 +85,11 @@ struct ctdb_call_info {
  */
 #define CTDB_SRVID_UNBAN_NODE 0xF600000000000000LL
 
+/*
+  a message to tell the recovery daemon to fetch a set of records
+ */
+#define CTDB_SRVID_VACUUM_FETCH 0xF700000000000000LL
+
 
 /* used on the domain socket, send a pdu to the local daemon */
 #define CTDB_CURRENT_NODE     0xF0000001
@@ -225,7 +230,8 @@ int ctdb_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
 
 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
-struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
+struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
+                                             void (*callback)(struct ctdb_client_call_state *));
 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call);
 
 /* send a ctdb message */
index 9582b6dade2600a3b6b86ecabd8e7b3164cd00d5..aa4cc96c98ef05b61750b0b2b72622c82726a45c 100644 (file)
@@ -484,6 +484,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS          = 0,
                    CTDB_CONTROL_TRANSACTION_START       = 65,
                    CTDB_CONTROL_TRANSACTION_COMMIT      = 66,
                    CTDB_CONTROL_WIPE_DATABASE           = 67,
+                   CTDB_CONTROL_DELETE_RECORD           = 68,
 };     
 
 /*
@@ -1187,4 +1188,11 @@ int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id);
 int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id);
 int32_t ctdb_control_wipe_database(struct ctdb_context *ctdb, TDB_DATA indata);
 
+
+int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv);
+int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv);
+
+int32_t ctdb_control_delete_record(struct ctdb_context *ctdb, TDB_DATA indata);
+
+
 #endif
index 48e64c2b4cf8b6260356f60681b0638beab9a7e4..358545ed575d634f1573d48081728e8d74721731 100644 (file)
@@ -342,3 +342,26 @@ tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_st
        return 0;
 }
 
+
+
+/* 
+   return the size of the freelist - used to decide if we should repack 
+*/
+int tdb_freelist_size(struct tdb_context *tdb)
+{
+       tdb_off_t ptr;
+       int count=0;
+
+       if (tdb_lock(tdb, -1, F_RDLCK) == -1) {
+               return -1;
+       }
+
+       ptr = FREELIST_TOP;
+       while (ptr != 0 && tdb_ofs_read(tdb, ptr, &ptr) == 0) {
+               count++;
+               
+       }
+
+       tdb_unlock(tdb, -1, F_RDLCK);
+       return count;
+}
index f6d4b4b1f459771ecefd4ea2c33066c7d0f69ed2..371381049e91f7f290a9007b9229a693f6140f39 100644 (file)
@@ -156,6 +156,7 @@ void tdb_dump_all(struct tdb_context *tdb);
 int tdb_printfreelist(struct tdb_context *tdb);
 int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries);
 int tdb_wipe_all(struct tdb_context *tdb);
+int tdb_freelist_size(struct tdb_context *tdb);
 
 extern TDB_DATA tdb_null;
 
index f2fd6ee641f7f8e9fbd4d4ca64fbdb010d26f7e5..4e013a530ea06c714f18c4eb08fdaea7ca82a5ca 100644 (file)
@@ -321,6 +321,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_wipe_database));
                return ctdb_control_wipe_database(ctdb, indata);
 
+       case CTDB_CONTROL_DELETE_RECORD:
+               return ctdb_control_delete_record(ctdb, indata);
+
        default:
                DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 7f165cf73af6d58f81056fc60708131846b261b2..97a7d0251f53c5d73610cae8d35f2732ea0b6138 100644 (file)
@@ -636,3 +636,90 @@ bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
 }
 
 
+/*
+  delete a record as part of the vacuum process
+  only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
+  use non-blocking locks
+ */
+int32_t ctdb_control_delete_record(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+       struct ctdb_rec_data *rec = (struct ctdb_rec_data *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header *hdr, *hdr2;
+
+       if (indata.dsize < sizeof(uint32_t) || indata.dsize != rec->length) {
+               DEBUG(0,(__location__ " Bad record size in ctdb_control_delete_record\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, rec->reqid);
+       if (!ctdb_db) {
+               DEBUG(0,(__location__ " Unknown db 0x%08x\n", rec->reqid));
+               return -1;
+       }
+
+       key.dsize = rec->keylen;
+       key.dptr  = &rec->data[0];
+       data.dsize = rec->datalen;
+       data.dptr = &rec->data[rec->keylen];
+
+       if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
+               DEBUG(2,(__location__ " Called delete on record where we are lmaster\n"));
+               return -1;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               DEBUG(0,(__location__ " Bad record size\n"));
+               return -1;
+       }
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+       /* use a non-blocking lock */
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+               return -1;
+       }
+
+       data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+       if (data.dptr == NULL) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               return 0;
+       }
+
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               tdb_delete(ctdb_db->ltdb->tdb, key);
+               DEBUG(0,(__location__ " Deleted corrupt record\n"));
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               free(data.dptr);
+               return 0;
+       }
+       
+       hdr2 = (struct ctdb_ltdb_header *)data.dptr;
+
+       if (hdr2->rsn > hdr->rsn) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               DEBUG(2,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
+                        (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
+               free(data.dptr);
+               return -1;              
+       }
+
+       if (hdr2->dmaster == ctdb->pnn) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               DEBUG(2,(__location__ " Attempted delete record where we are the dmaster\n"));
+               free(data.dptr);
+               return -1;                              
+       }
+
+       if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               DEBUG(2,(__location__ " Failed to delete record\n"));
+               free(data.dptr);
+               return -1;                                              
+       }
+
+       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       free(data.dptr);
+       return 0;       
+}
index 8dbf46932b2cc50bca9778dc5bc5fd4f5bcb3aa2..c7086468add81ef7179ba140b8a8a58d98dcccf9 100644 (file)
@@ -701,6 +701,94 @@ static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid,
 }
 
 
+/*
+  called when a vacuum fetch has completed - just free it
+ */
+static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
+{
+       talloc_free(state);
+}
+
+
+/*
+  handler for vacuum fetch
+*/
+static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                                TDB_DATA data, void *private_data)
+{
+       struct ctdb_call call;
+       struct ctdb_control_pulldb_reply *recs;
+       int ret, i;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       const char *name;
+       struct ctdb_dbid_map *dbmap=NULL;
+       bool persistent = false;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_rec_data *r;
+
+       recs = (struct ctdb_control_pulldb_reply *)data.dptr;
+
+       /* work out if the database is persistent */
+       ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
+       if (ret != 0) {
+               DEBUG(0, (__location__ " Unable to get dbids from local node\n"));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       for (i=0;i<dbmap->num;i++) {
+               if (dbmap->dbs[i].dbid == recs->db_id) {
+                       persistent = dbmap->dbs[i].persistent;
+                       break;
+               }
+       }
+       if (i == dbmap->num) {
+               DEBUG(0, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;         
+       }
+
+       /* find the name of this database */
+       if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
+               DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
+               talloc_free(tmp_ctx);
+               return;
+       }
+
+       /* attach to it */
+       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       if (ctdb_db == NULL) {
+               DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+               talloc_free(tmp_ctx);
+               return;
+       }
+       
+
+       ZERO_STRUCT(call);
+       call.call_id = CTDB_NULL_FUNC;
+       call.flags = CTDB_IMMEDIATE_MIGRATION;
+
+       r = (struct ctdb_rec_data *)&recs->data[0];
+       
+       for (i=0;
+            i<recs->count;
+            r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
+               struct ctdb_client_call_state *state;
+
+               call.key.dptr = &r->data[0];
+               call.key.dsize = r->keylen;
+
+               state = ctdb_call_send(ctdb_db, &call, vacuum_fetch_callback);
+               if (state == NULL) {
+                       DEBUG(0,(__location__ " Failed to setup vacuum fetch call\n"));
+                       talloc_free(tmp_ctx);
+                       return;                 
+               }
+       }
+
+       talloc_free(tmp_ctx);   
+}
+
 
 /*
   called when ctdb_wait_timeout should finish
@@ -1806,6 +1894,9 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        /* and one for when nodes are unbanned */
        ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
+
+       /* register a message port for vacuum fetch */
+       ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
        
 again:
        if (mem_ctx) {
index a1ce35aace223be9c916fe0392986948d28b78ea..6255d7bf642e5402afa92b49ac7df83ecbef073d 100644 (file)
@@ -1025,7 +1025,6 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
                            CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL, NULL, NULL);
 }
 
-
 static const struct {
        const char *name;
        int (*fn)(struct ctdb_context *, int, const char **);
@@ -1068,6 +1067,8 @@ static const struct {
        { "unregsrvid",      unregsrvid,                false, "unregister a server id", "<pnn> <type> <id>" },
        { "chksrvid",        chksrvid,                  false, "check if a server id exists", "<pnn> <type> <id>" },
        { "getsrvids",       getsrvids,                 false, "get a list of all server ids"},
+       { "vacuum",          ctdb_vacuum,               false, "vacuum the databases of empty records", "[max_records]"},
+       { "repack",          ctdb_repack,               false, "repack all databases", "[max_freelist]"},
 };
 
 /*
diff --git a/tools/ctdb_vacuum.c b/tools/ctdb_vacuum.c
new file mode 100644 (file)
index 0000000..6e88bac
--- /dev/null
@@ -0,0 +1,619 @@
+/* 
+   ctdb control tool - database vacuum 
+
+   Copyright (C) Andrew Tridgell  2008
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/network.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+#include "db_wrap.h"
+
+/* should be tunable */
+#define TIMELIMIT() timeval_current_ofs(10, 0)
+
+struct async_data {
+       uint32_t count;
+       uint32_t fail_count;
+};
+
+static void async_callback(struct ctdb_client_control_state *state)
+{
+       struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
+       int ret;
+       int32_t res;
+
+       /* one more node has responded with recmode data */
+       data->count--;
+
+       /* if we failed to push the db, then return an error and let
+          the main loop try again.
+       */
+       if (state->state != CTDB_CONTROL_DONE) {
+               data->fail_count++;
+               return;
+       }
+       
+       state->async.fn = NULL;
+
+       ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
+       if ((ret != 0) || (res != 0)) {
+               data->fail_count++;
+       }
+}
+
+static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
+{
+       /* set up the callback functions */
+       state->async.fn = async_callback;
+       state->async.private_data = data;
+       
+       /* one more control to wait for to complete */
+       data->count++;
+}
+
+
+/* wait for up to the maximum number of seconds allowed
+   or until all nodes we expect a response from has replied
+*/
+static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
+{
+       while (data->count > 0) {
+               event_loop_once(ctdb->ev);
+       }
+       if (data->fail_count != 0) {
+               DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
+               return -1;
+       }
+       return 0;
+}
+
+/* 
+   perform a simple control on nodes in the vnn map except ourselves.
+   The control cannot return data
+ */
+static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
+                                  TDB_DATA data)
+{
+       struct async_data *async_data;
+       struct ctdb_client_control_state *state;
+       int j;
+       struct timeval timeout = TIMELIMIT();
+       
+       async_data = talloc_zero(ctdb, struct async_data);
+       CTDB_NO_MEMORY_FATAL(ctdb, async_data);
+
+       /* loop over all active nodes and send an async control to each of them */
+       for (j=0; j<ctdb->vnn_map->size; j++) {
+               uint32_t pnn = ctdb->vnn_map->map[j];
+               if (pnn == ctdb->pnn) {
+                       continue;
+               }
+               state = ctdb_control_send(ctdb, pnn, 0, opcode, 
+                                         0, data, async_data, NULL, &timeout, NULL);
+               if (state == NULL) {
+                       DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
+                       talloc_free(async_data);
+                       return -1;
+               }
+               
+               async_add(async_data, state);
+       }
+
+       if (async_wait(ctdb, async_data) != 0) {
+               talloc_free(async_data);
+               return -1;
+       }
+
+       talloc_free(async_data);
+       return 0;
+}
+
+
+/*
+  vacuum one record
+ */
+static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, struct ctdb_db_context *ctdb_db)
+{
+       TDB_DATA data;
+       struct ctdb_ltdb_header *hdr;
+       struct ctdb_rec_data *rec;
+       uint64_t rsn;
+
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+               /* the chain is busy - come back later */
+               return 0;
+       }
+
+       data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       if (data.dptr == NULL) {
+               return 0;
+       }
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               free(data.dptr);
+               return 0;
+       }
+
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+       rsn = hdr->rsn;
+
+       /* if we are not the lmaster and the dmaster then skip the record */
+       if (hdr->dmaster != ctdb->pnn ||
+           ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
+               free(data.dptr);
+               return 0;
+       }
+
+       rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
+       free(data.dptr);
+       if (rec == NULL) {
+               /* try it again later */
+               return 0;
+       }
+
+       data.dptr = (void *)rec;
+       data.dsize = rec->length;
+
+       if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
+               /* one or more nodes failed to delete a record - no problem! */
+               talloc_free(rec);
+               return 0;
+       }
+
+       talloc_free(rec);
+
+       /* its deleted on all other nodes - refetch, check and delete */
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+               /* the chain is busy - come back later */
+               return 0;
+       }
+
+       data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+       if (data.dptr == NULL) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               return 0;
+       }
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               free(data.dptr);
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               return 0;
+       }
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+       /* if we are not the lmaster and the dmaster then skip the record */
+       if (hdr->dmaster != ctdb->pnn ||
+           ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
+           rsn != hdr->rsn) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+               free(data.dptr);
+               return 0;
+       }
+
+       tdb_delete(ctdb_db->ltdb->tdb, key);
+       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       free(data.dptr);
+
+       return 0;
+}
+
+
+/*
+  vacuum records for which we are the lmaster 
+ */
+static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
+                            struct ctdb_db_context *ctdb_db)
+{
+       struct ctdb_rec_data *r;
+       int i;
+
+       r = (struct ctdb_rec_data *)&list->data[0];
+       
+       for (i=0;
+            i<list->count;
+            r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
+               TDB_DATA key;
+               key.dptr = &r->data[0];
+               key.dsize = r->keylen;
+               if (ctdb_vacuum_one(ctdb, key, ctdb_db) != 0) {
+                       return -1;
+               }
+       }
+
+       return 0;       
+}
+
+/* 
+   a list of records to possibly delete
+ */
+struct vacuum_data {
+       uint32_t vacuum_limit;
+       struct ctdb_context *ctdb;
+       struct ctdb_control_pulldb_reply **list;
+       bool traverse_error;
+       uint32_t total;
+};
+
+/*
+  traverse function for vacuuming
+ */
+static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
+{
+       struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
+       uint32_t lmaster;
+       struct ctdb_ltdb_header *hdr;
+       struct ctdb_rec_data *rec;
+       size_t old_size;
+              
+       lmaster = ctdb_lmaster(vdata->ctdb, &key);
+       if (lmaster >= vdata->ctdb->vnn_map->size) {
+               return 0;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               /* its not a deleted record */
+               return 0;
+       }
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+       if (hdr->dmaster != vdata->ctdb->pnn) {
+               return 0;
+       }
+
+
+       /* add the record to the blob ready to send to the nodes */
+       rec = ctdb_marshall_record(vdata->list[lmaster], 0, key, NULL, tdb_null);
+       if (rec == NULL) {
+               DEBUG(0,(__location__ " Out of memory\n"));
+               vdata->traverse_error = true;
+               return -1;
+       }
+       old_size = talloc_get_size(vdata->list[lmaster]);
+       vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
+                                                  old_size + rec->length);
+       if (vdata->list[lmaster] == NULL) {
+               DEBUG(0,(__location__ " Failed to expand\n"));
+               vdata->traverse_error = true;
+               return -1;
+       }
+       vdata->list[lmaster]->count++;
+       memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
+       talloc_free(rec);
+
+       vdata->total++;
+
+       /* don't gather too many records */
+       if (vdata->vacuum_limit != 0 &&
+           vdata->total == vdata->vacuum_limit) {
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/* vacuum one database */
+static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
+                         bool persistent, uint32_t vacuum_limit)
+{
+       struct ctdb_db_context *ctdb_db;
+       const char *name;
+       struct vacuum_data *vdata;
+       int i;
+
+       vdata = talloc_zero(ctdb, struct vacuum_data);
+       if (vdata == NULL) {
+               DEBUG(0,(__location__ " Out of memory\n"));
+               return -1;
+       }
+
+       vdata->ctdb = ctdb;
+       vdata->vacuum_limit = vacuum_limit;
+
+       if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
+               DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+               talloc_free(vdata);
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       if (ctdb_db == NULL) {
+               DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+               talloc_free(vdata);
+               return -1;
+       }
+
+       /* the list needs to be of length num_nodes */
+       vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
+       if (vdata->list == NULL) {
+               DEBUG(0,(__location__ " Out of memory\n"));
+               talloc_free(vdata);
+               return -1;
+       }
+       for (i=0;i<ctdb->vnn_map->size;i++) {
+               vdata->list[i] = (struct ctdb_control_pulldb_reply *)
+                       talloc_zero_size(vdata->list, 
+                                   offsetof(struct ctdb_control_pulldb_reply, data));
+               if (vdata->list[i] == NULL) {
+                       DEBUG(0,(__location__ " Out of memory\n"));
+                       talloc_free(vdata);
+                       return -1;
+               }
+               vdata->list[i]->db_id = db_id;
+       }
+
+       /* traverse, looking for records that might be able to be vacuumed */
+       if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
+           vdata->traverse_error) {
+               DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
+               talloc_free(vdata);
+               return -1;              
+       }
+
+
+       for (i=0;i<ctdb->vnn_map->size;i++) {
+               if (vdata->list[i]->count == 0) {
+                       continue;
+               }
+
+               printf("Found %u records for lmaster %u\n", vdata->list[i]->count, i);          
+
+               /* for records where we are not the lmaster, tell the lmaster to fetch the record */
+               if (ctdb->vnn_map->map[i] != ctdb->pnn) {
+                       TDB_DATA data;
+                       data.dsize = talloc_get_size(vdata->list[i]);
+                       data.dptr  = (void *)vdata->list[i];
+                       if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
+                               DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
+                                        ctdb->vnn_map->map[i]));
+                               talloc_free(vdata);
+                               return -1;              
+                       }
+                       continue;
+               }
+
+               /* for records where we are the lmaster, we can try to delete them */
+               if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db) != 0) {
+                       DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
+                       talloc_free(vdata);
+                       return -1;                                      
+               }
+       }       
+
+       /* this ensures we run our event queue */
+       ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+
+       talloc_free(vdata);
+
+       return 0;
+}
+
+
+/*
+  vacuum all our databases
+ */
+int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       struct ctdb_dbid_map *dbmap=NULL;
+       struct ctdb_node_map *nodemap=NULL;
+       int ret, i, pnn;
+       uint32_t vacuum_limit = 100;
+
+       if (argc > 0) {
+               vacuum_limit = atoi(argv[0]);
+       }
+
+       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
+       if (ret != 0) {
+               DEBUG(0, ("Unable to get dbids from local node\n"));
+               return ret;
+       }
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
+       if (ret != 0) {
+               DEBUG(0, ("Unable to get nodemap from local node\n"));
+               return ret;
+       }
+
+       ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
+       if (ret != 0) {
+               DEBUG(0, ("Unable to get vnnmap from local node\n"));
+               return ret;
+       }
+
+       pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       if (pnn == -1) {
+               DEBUG(0, ("Unable to get pnn from local node\n"));
+               return -1;
+       }
+       ctdb->pnn = pnn;
+
+       for (i=0;i<dbmap->num;i++) {
+               if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
+                                  dbmap->dbs[i].persistent, vacuum_limit) != 0) {
+                       DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+struct traverse_state {
+       bool error;
+       struct tdb_context *dest_db;
+};
+
+/*
+  traverse function for repacking
+ */
+static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
+{
+       struct traverse_state *state = (struct traverse_state *)private;
+       if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
+               state->error = true;
+               return -1;
+       }
+       return 0;
+}
+
+/*
+  repack a tdb
+ */
+static int ctdb_repack_tdb(struct tdb_context *tdb)
+{
+       struct tdb_context *tmp_db;
+       struct traverse_state state;
+
+       if (tdb_transaction_start(tdb) != 0) {
+               DEBUG(0,(__location__ " Failed to start transaction\n"));
+               return -1;
+       }
+
+       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+       if (tmp_db == NULL) {
+               DEBUG(0,(__location__ " Failed to create tmp_db\n"));
+               tdb_transaction_cancel(tdb);
+               return -1;
+       }
+
+       state.error = false;
+       state.dest_db = tmp_db;
+
+       if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
+               DEBUG(0,(__location__ " Failed to traverse copying out\n"));
+               tdb_transaction_cancel(tdb);
+               tdb_close(tmp_db);
+               return -1;              
+       }
+
+       if (state.error) {
+               DEBUG(0,(__location__ " Error during traversal\n"));
+               tdb_transaction_cancel(tdb);
+               tdb_close(tmp_db);
+               return -1;
+       }
+
+       if (tdb_wipe_all(tdb) != 0) {
+               DEBUG(0,(__location__ " Failed to wipe database\n"));
+               tdb_transaction_cancel(tdb);
+               tdb_close(tmp_db);
+               return -1;
+       }
+
+       state.error = false;
+       state.dest_db = tdb;
+
+       if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
+               DEBUG(0,(__location__ " Failed to traverse copying back\n"));
+               tdb_transaction_cancel(tdb);
+               tdb_close(tmp_db);
+               return -1;              
+       }
+
+       if (state.error) {
+               DEBUG(0,(__location__ " Error during second traversal\n"));
+               tdb_transaction_cancel(tdb);
+               tdb_close(tmp_db);
+               return -1;
+       }
+
+       tdb_close(tmp_db);
+
+       if (tdb_transaction_commit(tdb) != 0) {
+               DEBUG(0,(__location__ " Failed to commit\n"));
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/* repack one database */
+static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
+                         bool persistent, uint32_t repack_limit)
+{
+       struct ctdb_db_context *ctdb_db;
+       const char *name;
+       int size;
+
+       if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
+               DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       if (ctdb_db == NULL) {
+               DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+               return -1;
+       }
+
+       size = tdb_freelist_size(ctdb_db->ltdb->tdb);
+       if (size == -1) {
+               DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
+               return -1;
+       }
+
+       if (size <= repack_limit) {
+               return 0;
+       }
+
+       DEBUG(0,("Repacking %s with %u freelist entries\n", name, size));
+
+       if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
+               DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/*
+  repack all our databases
+ */
+int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       struct ctdb_dbid_map *dbmap=NULL;
+       int ret, i;
+       uint32_t repack_limit = 100;
+
+       if (argc > 0) {
+               repack_limit = atoi(argv[0]);
+       }
+
+       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
+       if (ret != 0) {
+               DEBUG(0, ("Unable to get dbids from local node\n"));
+               return ret;
+       }
+
+       for (i=0;i<dbmap->num;i++) {
+               if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
+                                  dbmap->dbs[i].persistent, repack_limit) != 0) {
+                       DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
+                       return -1;
+               }
+       }
+
+       return 0;
+}