LVS
[sahlberg/ctdb.git] / tools / ctdb_vacuum.c
index 86d9c4c5f198b5d4bf3e304ac27038e2c54a9fae..bd75a69e2564763b69e3f5ffbb467d8570e7df50 100644 (file)
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "system/filesys.h"
 #include "system/network.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include "db_wrap.h"
 
 /* should be tunable */
 #define TIMELIMIT() timeval_current_ofs(10, 0)
 
-struct async_data {
-       uint32_t count;
-       uint32_t fail_count;
-};
-
-static void async_callback(struct ctdb_client_control_state *state)
-{
-       struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
-       int ret;
-       int32_t res;
-
-       /* one more node has responded with recmode data */
-       data->count--;
-
-       /* if we failed to push the db, then return an error and let
-          the main loop try again.
-       */
-       if (state->state != CTDB_CONTROL_DONE) {
-               data->fail_count++;
-               return;
-       }
-       
-       state->async.fn = NULL;
-
-       ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
-       if ((ret != 0) || (res != 0)) {
-               data->fail_count++;
-       }
-}
-
-static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
-{
-       /* set up the callback functions */
-       state->async.fn = async_callback;
-       state->async.private_data = data;
-       
-       /* one more control to wait for to complete */
-       data->count++;
-}
-
-
-/* wait for up to the maximum number of seconds allowed
-   or until all nodes we expect a response from has replied
-*/
-static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
-{
-       while (data->count > 0) {
-               event_loop_once(ctdb->ev);
-       }
-       if (data->fail_count != 0) {
-               return -1;
-       }
-       return 0;
-}
-
-/* 
-   perform a simple control on nodes in the vnn map except ourselves.
-   The control cannot return data
- */
-static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
-                                  TDB_DATA data)
-{
-       struct async_data *async_data;
-       struct ctdb_client_control_state *state;
-       int j;
-       struct timeval timeout = TIMELIMIT();
-       
-       async_data = talloc_zero(ctdb, struct async_data);
-       CTDB_NO_MEMORY_FATAL(ctdb, async_data);
-
-       /* loop over all active nodes and send an async control to each of them */
-       for (j=0; j<ctdb->vnn_map->size; j++) {
-               uint32_t pnn = ctdb->vnn_map->map[j];
-               if (pnn == ctdb->pnn) {
-                       continue;
-               }
-               state = ctdb_control_send(ctdb, pnn, 0, opcode, 
-                                         0, data, async_data, NULL, &timeout, NULL);
-               if (state == NULL) {
-                       DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
-                       talloc_free(async_data);
-                       return -1;
-               }
-               
-               async_add(async_data, state);
-       }
-
-       if (async_wait(ctdb, async_data) != 0) {
-               talloc_free(async_data);
-               return -1;
-       }
-
-       talloc_free(async_data);
-       return 0;
-}
-
-
-/*
-  vacuum one record
- */
-static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, 
-                          struct ctdb_db_context *ctdb_db, uint32_t *count)
-{
-       TDB_DATA data;
-       struct ctdb_ltdb_header *hdr;
-       struct ctdb_rec_data *rec;
-       uint64_t rsn;
-
-       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
-               /* the chain is busy - come back later */
-               return 0;
-       }
-
-       data = tdb_fetch(ctdb_db->ltdb->tdb, key);
-       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-       if (data.dptr == NULL) {
-               return 0;
-       }
-       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
-               free(data.dptr);
-               return 0;
-       }
-
-
-       hdr = (struct ctdb_ltdb_header *)data.dptr;
-       rsn = hdr->rsn;
-
-       /* if we are not the lmaster and the dmaster then skip the record */
-       if (hdr->dmaster != ctdb->pnn ||
-           ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
-               free(data.dptr);
-               return 0;
-       }
-
-       rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
-       free(data.dptr);
-       if (rec == NULL) {
-               /* try it again later */
-               return 0;
-       }
-
-       data.dptr = (void *)rec;
-       data.dsize = rec->length;
-
-       if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
-               /* one or more nodes failed to delete a record - no problem! */
-               talloc_free(rec);
-               return 0;
-       }
-
-       talloc_free(rec);
-
-       /* its deleted on all other nodes - refetch, check and delete */
-       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
-               /* the chain is busy - come back later */
-               return 0;
-       }
-
-       data = tdb_fetch(ctdb_db->ltdb->tdb, key);
-       if (data.dptr == NULL) {
-               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-               return 0;
-       }
-       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
-               free(data.dptr);
-               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-               return 0;
-       }
-
-       hdr = (struct ctdb_ltdb_header *)data.dptr;
-
-       /* if we are not the lmaster and the dmaster then skip the record */
-       if (hdr->dmaster != ctdb->pnn ||
-           ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
-           rsn != hdr->rsn) {
-               tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-               free(data.dptr);
-               return 0;
-       }
-
-       tdb_delete(ctdb_db->ltdb->tdb, key);
-       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-       free(data.dptr);
-
-       (*count)++;
-
-       return 0;
-}
-
-
-/*
-  vacuum records for which we are the lmaster 
- */
-static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
-                            struct ctdb_db_context *ctdb_db, uint32_t *count)
-{
-       struct ctdb_rec_data *r;
-       int i;
-
-       r = (struct ctdb_rec_data *)&list->data[0];
-       
-       for (i=0;
-            i<list->count;
-            r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
-               TDB_DATA key;
-               key.dptr = &r->data[0];
-               key.dsize = r->keylen;
-               if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;       
-}
 
 /* 
    a list of records to possibly delete
@@ -249,24 +36,37 @@ static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pull
 struct vacuum_data {
        uint32_t vacuum_limit;
        struct ctdb_context *ctdb;
-       struct ctdb_control_pulldb_reply **list;
+       struct ctdb_db_context *ctdb_db;
+       trbt_tree_t *delete_tree;
+       uint32_t delete_count;
+       struct ctdb_marshall_buffer **list;
        bool traverse_error;
        uint32_t total;
 };
 
+/* this structure contains the information for one record to be deleted */
+struct delete_record_data {
+       struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_ltdb_header hdr;
+       TDB_DATA key;
+};
+
 /*
   traverse function for vacuuming
  */
 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
 {
        struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
+       struct ctdb_context *ctdb = vdata->ctdb;
+       struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
        uint32_t lmaster;
        struct ctdb_ltdb_header *hdr;
        struct ctdb_rec_data *rec;
        size_t old_size;
               
-       lmaster = ctdb_lmaster(vdata->ctdb, &key);
-       if (lmaster >= vdata->ctdb->vnn_map->size) {
+       lmaster = ctdb_lmaster(ctdb, &key);
+       if (lmaster >= ctdb->vnn_map->size) {
                return 0;
        }
 
@@ -277,15 +77,55 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
        hdr = (struct ctdb_ltdb_header *)data.dptr;
 
-       if (hdr->dmaster != vdata->ctdb->pnn) {
+       if (hdr->dmaster != ctdb->pnn) {
                return 0;
        }
 
 
+       /* is this a records we could possibly delete? I.e.
+          if the record is empty and also we are both lmaster
+          and dmaster for the record we should be able to delete it
+       */
+       if ( (lmaster == ctdb->pnn)
+          &&( (vdata->delete_count < vdata->vacuum_limit)
+            ||(vdata->vacuum_limit == 0) ) ){
+               uint32_t hash;
+
+               hash = ctdb_hash(&key);
+               if (trbt_lookup32(vdata->delete_tree, hash)) {
+                       DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
+               } else {
+                       struct delete_record_data *dd;
+
+                       /* store key and header indexed by the key hash */
+                       dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
+                       if (dd == NULL) {
+                               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+                               return -1;
+                       }
+                       dd->ctdb      = ctdb;
+                       dd->ctdb_db   = ctdb_db;
+                       dd->key.dsize = key.dsize;
+                       dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
+                       if (dd->key.dptr == NULL) {
+                               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+                               return -1;
+                       }
+
+                       dd->hdr = *hdr;
+
+       
+                       trbt_insert32(vdata->delete_tree, hash, dd);
+
+                       vdata->delete_count++;
+               }
+       }
+
+
        /* add the record to the blob ready to send to the nodes */
-       rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
+       rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
        if (rec == NULL) {
-               DEBUG(0,(__location__ " Out of memory\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                vdata->traverse_error = true;
                return -1;
        }
@@ -293,7 +133,7 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
                                                   old_size + rec->length);
        if (vdata->list[lmaster] == NULL) {
-               DEBUG(0,(__location__ " Failed to expand\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
                vdata->traverse_error = true;
                return -1;
        }
@@ -312,6 +152,84 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        return 0;
 }
 
+struct delete_records_list {
+       struct ctdb_marshall_buffer *records;
+};
+
+/*
+ traverse the tree of records to delete and marshall them into
+ a blob
+*/
+static void
+delete_traverse(void *param, void *data)
+{
+       struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
+       struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
+       struct ctdb_rec_data *rec;
+       size_t old_size;
+
+       rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
+       if (rec == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
+               return;
+       }
+
+       old_size = talloc_get_size(recs->records);
+       recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
+       if (recs->records == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
+               return;
+       }
+       recs->records->count++;
+       memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
+}
+
+
+static void delete_record(void *param, void *d)
+{
+       struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
+       struct ctdb_context *ctdb = dd->ctdb;
+       struct ctdb_db_context *ctdb_db = dd->ctdb_db;
+       uint32_t *count = (uint32_t *)param;
+       struct ctdb_ltdb_header *hdr;
+       TDB_DATA data;
+
+       /* its deleted on all other nodes - refetch, check and delete */
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
+               /* the chain is busy - come back later */
+               return;
+       }
+
+       data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
+       if (data.dptr == NULL) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
+               return;
+       }
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               free(data.dptr);
+               tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
+               return;
+       }
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+       /* if we are not the lmaster and the dmaster then skip the record */
+       if (hdr->dmaster != ctdb->pnn ||
+           ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
+           dd->hdr.rsn != hdr->rsn) {
+               tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
+               free(data.dptr);
+               return;
+       }
+
+       ctdb_block_signal(SIGALRM);
+       tdb_delete(ctdb_db->ltdb->tdb, dd->key);
+       ctdb_unblock_signal(SIGALRM);
+       tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
+       free(data.dptr);
+
+       (*count)++;
+}
 
 /* vacuum one database */
 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
@@ -324,39 +242,45 @@ static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb
 
        vdata = talloc_zero(ctdb, struct vacuum_data);
        if (vdata == NULL) {
-               DEBUG(0,(__location__ " Out of memory\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                return -1;
        }
 
        vdata->ctdb = ctdb;
        vdata->vacuum_limit = vacuum_limit;
+       vdata->delete_tree = trbt_create(vdata, 0);
+       if (vdata->delete_tree == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+               return -1;
+       }
 
        if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
-               DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
                talloc_free(vdata);
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
        if (ctdb_db == NULL) {
-               DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
                talloc_free(vdata);
                return -1;
        }
+       vdata->ctdb_db = ctdb_db;
 
        /* the list needs to be of length num_nodes */
-       vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
+       vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
        if (vdata->list == NULL) {
-               DEBUG(0,(__location__ " Out of memory\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                talloc_free(vdata);
                return -1;
        }
        for (i=0;i<ctdb->vnn_map->size;i++) {
-               vdata->list[i] = (struct ctdb_control_pulldb_reply *)
+               vdata->list[i] = (struct ctdb_marshall_buffer *)
                        talloc_zero_size(vdata->list, 
-                                   offsetof(struct ctdb_control_pulldb_reply, data));
+                                   offsetof(struct ctdb_marshall_buffer, data));
                if (vdata->list[i] == NULL) {
-                       DEBUG(0,(__location__ " Out of memory\n"));
+                       DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                        talloc_free(vdata);
                        return -1;
                }
@@ -366,7 +290,7 @@ static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb
        /* traverse, looking for records that might be able to be vacuumed */
        if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
            vdata->traverse_error) {
-               DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
+               DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
                talloc_free(vdata);
                return -1;              
        }
@@ -384,8 +308,8 @@ static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb
 
                        data.dsize = talloc_get_size(vdata->list[i]);
                        data.dptr  = (void *)vdata->list[i];
-                       if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
-                               DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
+                       if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
                                         ctdb->vnn_map->map[i]));
                                talloc_free(vdata);
                                return -1;              
@@ -394,23 +318,104 @@ static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb
                }
        }       
 
-       for (i=0;i<ctdb->vnn_map->size;i++) {
-               uint32_t count = 0;
 
-               if (vdata->list[i]->count == 0) {
-                       continue;
+       /* Process all records we can delete (if any) */
+       if (vdata->delete_count > 0) {
+               struct delete_records_list *recs;
+               TDB_DATA indata, outdata;
+               int ret;
+               int32_t res;
+               uint32_t count;
+
+               recs = talloc_zero(vdata, struct delete_records_list);
+               if (recs == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+                       return -1;
+               }
+               recs->records = (struct ctdb_marshall_buffer *)
+                       talloc_zero_size(vdata, 
+                                   offsetof(struct ctdb_marshall_buffer, data));
+               if (recs->records == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
+                       return -1;
                }
+               recs->records->db_id = db_id;
+
+               /* traverse the tree of all records we want to delete and
+                  create a blob we can send to the other nodes.
+               */
+               trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
+
+               indata.dsize = talloc_get_size(recs->records);
+               indata.dptr  = (void *)recs->records;
+
+               /* now tell all the other nodes to delete all these records
+                  (if possible)
+                */
+               for (i=0;i<ctdb->vnn_map->size;i++) {
+                       struct ctdb_marshall_buffer *records;
+                       struct ctdb_rec_data *rec;
+
+                       if (ctdb->vnn_map->map[i] == ctdb->pnn) {
+                               /* we dont delete the records on the local node
+                                  just yet
+                               */
+                               continue;
+                       }
 
-               /* for records where we are the lmaster, we can try to delete them */
-               if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
-                       DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
-                       talloc_free(vdata);
-                       return -1;                                      
+                       ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
+                                       CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
+                                       indata, recs, &outdata, &res,
+                                       NULL, NULL);
+                       if (ret != 0 || res != 0) {
+                               DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
+                               exit(10);
+                       }
+
+                       /* outdata countains the list of records coming back
+                          from the node which the node could not delete
+                       */
+                       records = (struct ctdb_marshall_buffer *)outdata.dptr;
+                       rec = (struct ctdb_rec_data *)&records->data[0];
+                       while (records->count-- > 1) {
+                               TDB_DATA reckey, recdata;
+                               struct ctdb_ltdb_header *rechdr;
+
+                               reckey.dptr = &rec->data[0];
+                               reckey.dsize = rec->keylen;
+                               recdata.dptr = &rec->data[reckey.dsize];
+                               recdata.dsize = rec->datalen;
+
+                               if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
+                                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
+                                       exit(10);
+                               }
+                               rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
+                               recdata.dptr += sizeof(*rechdr);
+                               recdata.dsize -= sizeof(*rechdr);
+
+                               /* that other node couldnt delete the record
+                                  so we shouldnt delete it either.
+                                  remove it from the tree.
+                               */
+                               talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
+
+                               rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+                       }           
                }
-               if (count != 0) {
-                       printf("Deleted %u records on this node from '%s'\n", count, name);
+
+
+               /* the only records remaining in the tree would be those
+                  records where all other nodes could successfully
+                  delete them, so we can now safely delete them on the
+                  lmaster as well.
+               */
+               count = 0;
+               trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
+               if (vdata->delete_count != 0) {
+                       printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
                }
-       }       
+       }
 
        /* this ensures we run our event queue */
        ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
@@ -437,25 +442,25 @@ int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
 
        ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
        if (ret != 0) {
-               DEBUG(0, ("Unable to get dbids from local node\n"));
+               DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
                return ret;
        }
 
        ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
        if (ret != 0) {
-               DEBUG(0, ("Unable to get nodemap from local node\n"));
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
                return ret;
        }
 
        ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
        if (ret != 0) {
-               DEBUG(0, ("Unable to get vnnmap from local node\n"));
+               DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
                return ret;
        }
 
        pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
        if (pnn == -1) {
-               DEBUG(0, ("Unable to get pnn from local node\n"));
+               DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
                return -1;
        }
        ctdb->pnn = pnn;
@@ -463,7 +468,7 @@ int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
        for (i=0;i<dbmap->num;i++) {
                if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
                                   dbmap->dbs[i].persistent, vacuum_limit) != 0) {
-                       DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
+                       DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
                        return -1;
                }
        }
@@ -498,13 +503,15 @@ static int ctdb_repack_tdb(struct tdb_context *tdb)
        struct traverse_state state;
 
        if (tdb_transaction_start(tdb) != 0) {
-               DEBUG(0,(__location__ " Failed to start transaction\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
                return -1;
        }
 
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
+                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
+                         O_RDWR|O_CREAT, 0);
        if (tmp_db == NULL) {
-               DEBUG(0,(__location__ " Failed to create tmp_db\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
                tdb_transaction_cancel(tdb);
                return -1;
        }
@@ -513,21 +520,21 @@ static int ctdb_repack_tdb(struct tdb_context *tdb)
        state.dest_db = tmp_db;
 
        if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
-               DEBUG(0,(__location__ " Failed to traverse copying out\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
                tdb_transaction_cancel(tdb);
                tdb_close(tmp_db);
                return -1;              
        }
 
        if (state.error) {
-               DEBUG(0,(__location__ " Error during traversal\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
                tdb_transaction_cancel(tdb);
                tdb_close(tmp_db);
                return -1;
        }
 
        if (tdb_wipe_all(tdb) != 0) {
-               DEBUG(0,(__location__ " Failed to wipe database\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
                tdb_transaction_cancel(tdb);
                tdb_close(tmp_db);
                return -1;
@@ -537,14 +544,14 @@ static int ctdb_repack_tdb(struct tdb_context *tdb)
        state.dest_db = tdb;
 
        if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
-               DEBUG(0,(__location__ " Failed to traverse copying back\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
                tdb_transaction_cancel(tdb);
                tdb_close(tmp_db);
                return -1;              
        }
 
        if (state.error) {
-               DEBUG(0,(__location__ " Error during second traversal\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
                tdb_transaction_cancel(tdb);
                tdb_close(tmp_db);
                return -1;
@@ -553,7 +560,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb)
        tdb_close(tmp_db);
 
        if (tdb_transaction_commit(tdb) != 0) {
-               DEBUG(0,(__location__ " Failed to commit\n"));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
                return -1;
        }
 
@@ -570,19 +577,19 @@ static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
        int size;
 
        if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
-               DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, name, persistent);
+       ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
        if (ctdb_db == NULL) {
-               DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
                return -1;
        }
 
        size = tdb_freelist_size(ctdb_db->ltdb->tdb);
        if (size == -1) {
-               DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
                return -1;
        }
 
@@ -593,7 +600,7 @@ static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
        printf("Repacking %s with %u freelist entries\n", name, size);
 
        if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
-               DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
+               DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
                return -1;
        }
 
@@ -617,14 +624,14 @@ int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
 
        ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
        if (ret != 0) {
-               DEBUG(0, ("Unable to get dbids from local node\n"));
+               DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
                return ret;
        }
 
        for (i=0;i<dbmap->num;i++) {
                if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
                                   dbmap->dbs[i].persistent, repack_limit) != 0) {
-                       DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
+                       DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
                        return -1;
                }
        }