added automatic vacuuming of empty records during recovery
authorAndrew Tridgell <tridge@samba.org>
Wed, 23 May 2007 07:21:14 +0000 (17:21 +1000)
committerAndrew Tridgell <tridge@samba.org>
Wed, 23 May 2007 07:21:14 +0000 (17:21 +1000)
(This used to be ctdb commit f9181a784ac7009df5e9c996f4e0c3e99098b59a)

ctdb/common/ctdb_client.c
ctdb/common/ctdb_control.c
ctdb/common/ctdb_recover.c
ctdb/common/ctdb_recoverd.c
ctdb/include/ctdb_private.h

index c1324037c6aafe1c79b5207e4efb00d9a1223d1f..89d48dbc582bad6864c7c29e0175182eec7c6b0e 100644 (file)
@@ -813,9 +813,8 @@ int ctdb_ctrl_status(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_s
        TDB_DATA data;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_STATUS, 0, data
+                          CTDB_CONTROL_STATUS, 0, tdb_null
                           ctdb, &data, &res, NULL, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for status failed\n"));
@@ -840,13 +839,11 @@ int ctdb_ctrl_status(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_s
 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
 {
        int ret;
-       TDB_DATA data;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, data
-                          ctdb, &data, &res, &timeout, NULL);
+                          CTDB_CONTROL_SHUTDOWN, CTDB_CTRL_FLAG_NOREPLY, tdb_null
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for shutdown failed\n"));
                return -1;
@@ -861,13 +858,12 @@ int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32
 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA outdata;
        int32_t res;
        struct ctdb_vnn_map_wire *map;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GETVNNMAP, 0, data
+                          CTDB_CONTROL_GETVNNMAP, 0, tdb_null
                           mem_ctx, &outdata, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
@@ -889,6 +885,7 @@ int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint3
 
        CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
        memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
+       talloc_free(outdata.dptr);
                    
        return 0;
 }
@@ -899,13 +896,11 @@ int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint3
 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
 {
        int ret;
-       TDB_DATA data, outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_RECMODE, 0, data
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          CTDB_CONTROL_GET_RECMODE, 0, tdb_null
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
                return -1;
@@ -922,7 +917,7 @@ int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint
 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA data;
        int32_t res;
 
        data.dsize = sizeof(uint32_t);
@@ -930,7 +925,7 @@ int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_SET_RECMODE, 0, data, 
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setrecmode failed\n"));
                return -1;
@@ -945,13 +940,11 @@ int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint
 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
 {
        int ret;
-       TDB_DATA data, outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_RECMASTER, 0, data
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          CTDB_CONTROL_GET_RECMASTER, 0, tdb_null
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for getrecmaster failed\n"));
                return -1;
@@ -968,7 +961,7 @@ int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, struct timeval timeout, ui
 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA data;
        int32_t res;
 
        ZERO_STRUCT(data);
@@ -977,7 +970,7 @@ int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, ui
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_SET_RECMASTER, 0, data, 
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setrecmaster failed\n"));
                return -1;
@@ -990,15 +983,15 @@ int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, ui
 /*
   get a list of databases off a remote node
  */
-int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
+int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
+                      TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_DBMAP, 0, data
+                          CTDB_CONTROL_GET_DBMAP, 0, tdb_null
                           mem_ctx, &outdata, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getdbmap failed\n"));
@@ -1006,6 +999,7 @@ int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32
        }
 
        *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
+       talloc_free(outdata.dptr);
                    
        return 0;
 }
@@ -1019,12 +1013,11 @@ int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
                TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_NODEMAP, 0, data
+                          CTDB_CONTROL_GET_NODEMAP, 0, tdb_null
                           mem_ctx, &outdata, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
@@ -1032,6 +1025,7 @@ int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
        }
 
        *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
+       talloc_free(outdata.dptr);
                    
        return 0;
 }
@@ -1039,10 +1033,11 @@ int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
 /*
   set vnn map on a node
  */
-int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
+int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
+                       TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA data;
        int32_t res;
        struct ctdb_vnn_map_wire *map;
        size_t len;
@@ -1060,7 +1055,7 @@ int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint3
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_SETVNNMAP, 0, data, 
-                          mem_ctx, &outdata, &res, &timeout, NULL);
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
                return -1;
@@ -1074,7 +1069,8 @@ int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint3
 /*
   get all keys and records for a specific database
  */
-int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
+int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
+                    TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
 {
        int i, ret;
        TDB_DATA indata, outdata;
@@ -1126,13 +1122,16 @@ int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid
                rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
        }           
 
+       talloc_free(outdata.dptr);
+
        return 0;
 }
 
 /*
   copy a tdb from one node to another node
  */
-int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
+int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t sourcenode, 
+                    uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
 {
        int ret;
        TDB_DATA indata, outdata;
@@ -1155,6 +1154,7 @@ int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_PUSH_DB, 0, outdata, 
                           mem_ctx, NULL, &res, &timeout, NULL);
+       talloc_free(outdata.dptr);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
                return -1;
@@ -1166,10 +1166,11 @@ int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t
 /*
   change dmaster for all keys in the database to the new value
  */
-int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
+int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
+                        TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
 {
        int ret;
-       TDB_DATA indata, outdata;
+       TDB_DATA indata;
        int32_t res;
 
        indata.dsize = 2*sizeof(uint32_t);
@@ -1180,7 +1181,7 @@ int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_SET_DMASTER, 0, indata, 
-                          mem_ctx, &outdata, &res, &timeout, NULL);
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setdmaster failed\n"));
                return -1;
@@ -1195,7 +1196,7 @@ int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint
 int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid)
 {
        int ret;
-       TDB_DATA indata, outdata;
+       TDB_DATA indata;
        int32_t res;
 
        indata.dsize = sizeof(uint32_t);
@@ -1205,7 +1206,7 @@ int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_CLEAR_DB, 0, indata, 
-                          mem_ctx, &outdata, &res, NULL, NULL);
+                          NULL, NULL, &res, NULL, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for cleardb failed\n"));
                return -1;
@@ -1221,11 +1222,9 @@ int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
 {
        int ret;
        int32_t res;
-       TDB_DATA data;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
-                          data, NULL, NULL, &res, NULL, NULL);
+                          tdb_null, NULL, NULL, &res, NULL, NULL);
        if (ret != 0) {
                return -1;
        }
@@ -1324,8 +1323,7 @@ int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint3
        int32_t res;
        TDB_DATA data;
 
-       ZERO_STRUCT(data);
-       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, data, 
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
                           ctdb, &data, &res, NULL, NULL);
        if (ret != 0 || res != 0) {
                return -1;
@@ -1402,12 +1400,10 @@ uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
 int ctdb_status_reset(struct ctdb_context *ctdb, uint32_t destnode)
 {
        int ret;
-       TDB_DATA data;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_STATUS_RESET, 0, data
+                          CTDB_CONTROL_STATUS_RESET, 0, tdb_null
                           NULL, NULL, &res, NULL, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for reset status failed\n"));
@@ -1447,6 +1443,7 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
        }
        
        ctdb_db->db_id = *(uint32_t *)data.dptr;
+       talloc_free(data.dptr);
 
        ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(1, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
        if (ret != 0) {
@@ -1632,13 +1629,11 @@ int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
 {
        int ret;
-       TDB_DATA data, outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_PID, 0, data
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          CTDB_CONTROL_GET_PID, 0, tdb_null
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for getpid failed\n"));
                return -1;
@@ -1713,15 +1708,15 @@ int ctdb_ctrl_getvnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t
 int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t monmode)
 {
        int ret;
-       TDB_DATA data, outdata;
+       TDB_DATA data;
        int32_t res;
 
        data.dsize = sizeof(uint32_t);
-       data.dptr = (unsigned char *)&monmode;
+       data.dptr = (uint8_t *)&monmode;
 
        ret = ctdb_control(ctdb, destnode, 0, 
                           CTDB_CONTROL_SET_MONMODE, 0, data, 
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setmonmode failed\n"));
                return -1;
@@ -1736,13 +1731,11 @@ int ctdb_ctrl_setmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint
 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
 {
        int ret;
-       TDB_DATA data, outdata;
        int32_t res;
 
-       ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_MONMODE, 0, data
-                          ctdb, &outdata, &res, &timeout, NULL);
+                          CTDB_CONTROL_GET_MONMODE, 0, tdb_null
+                          NULL, NULL, &res, &timeout, NULL);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for getrecmode failed\n"));
                return -1;
@@ -1752,3 +1745,84 @@ int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint
 
        return 0;
 }
+
+
+/*
+  get maximum rsn for a db on a node
+ */
+int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
+                         uint32_t destnode, uint32_t db_id, uint64_t *max_rsn)
+{
+       TDB_DATA data, outdata;
+       int ret;
+       int32_t res;
+
+       data.dptr = (uint8_t *)&db_id;
+       data.dsize = sizeof(db_id);
+
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_MAX_RSN, 0, data, ctdb,
+                          &outdata, &res, &timeout, NULL);
+       if (ret != 0 || res != 0 || outdata.dsize != sizeof(uint64_t)) {
+               DEBUG(0,(__location__ " ctdb_control for get_max_rsn failed\n"));
+               return -1;
+       }
+
+       *max_rsn = *(uint64_t *)outdata.dptr;
+       talloc_free(outdata.dptr);
+
+       return 0;       
+}
+
+/*
+  set the rsn on non-empty records to the given rsn
+ */
+int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
+                              uint32_t destnode, uint32_t db_id, uint64_t rsn)
+{
+       TDB_DATA data;
+       int ret;
+       int32_t res;
+       struct ctdb_control_set_rsn_nonempty p;
+
+       p.db_id = db_id;
+       p.rsn = rsn;
+
+       data.dptr = (uint8_t *)&p;
+       data.dsize = sizeof(p);
+
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
+                          NULL, &res, &timeout, NULL);
+       if (ret != 0 || res != 0) {
+               DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
+               return -1;
+       }
+
+       return 0;       
+}
+
+/*
+  delete records which have a rsn below the given rsn
+ */
+int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
+                            uint32_t destnode, uint32_t db_id, uint64_t rsn)
+{
+       TDB_DATA data;
+       int ret;
+       int32_t res;
+       struct ctdb_control_delete_low_rsn p;
+
+       p.db_id = db_id;
+       p.rsn = rsn;
+
+       data.dptr = (uint8_t *)&p;
+       data.dsize = sizeof(p);
+
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
+                          NULL, &res, &timeout, NULL);
+       if (ret != 0 || res != 0) {
+               DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
+               return -1;
+       }
+
+       return 0;       
+}
index db01072d75b12262dfaf303db453339101a5a2c6..917bce99f4f4c238a595a0252bd60a467dd6900a 100644 (file)
@@ -252,6 +252,18 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        case CTDB_CONTROL_SHUTDOWN:
                exit(10);
 
+       case CTDB_CONTROL_MAX_RSN: 
+               CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+               return ctdb_control_max_rsn(ctdb, indata, outdata);
+
+       case CTDB_CONTROL_SET_RSN_NONEMPTY: 
+               CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_rsn_nonempty));
+               return ctdb_control_set_rsn_nonempty(ctdb, indata, outdata);
+
+       case CTDB_CONTROL_DELETE_LOW_RSN: 
+               CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_delete_low_rsn));
+               return ctdb_control_delete_low_rsn(ctdb, indata, outdata);
+
        default:
                DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 899e05e4b68752403d112f5b821b0b430cbe804a..4b466f9ae27c76ce4d7532deb4e3ddaa8a1bc8ca 100644 (file)
@@ -450,3 +450,167 @@ int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, TDB_DATA indata,
        return 0;
 }
 
+/*
+  callback for ctdb_control_max_rsn
+ */
+static int traverse_max_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+       uint64_t *max_rsn = (uint64_t *)p;
+
+       if (data.dsize >= sizeof(*h)) {
+               (*max_rsn) = MAX(*max_rsn, h->rsn);
+       }
+       return 0;
+}
+
+/*
+  get max rsn across an entire db
+ */
+int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+       struct ctdb_db_context *ctdb_db;
+       uint32_t db_id = *(uint32_t *)indata.dptr;
+       uint64_t max_rsn = 0;
+       int ret;
+
+       if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+               DEBUG(0,("rejecting ctdb_control_max_rsn when not frozen\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (!ctdb_db) {
+               DEBUG(0,(__location__ " Unknown db\n"));
+               return -1;
+       }
+
+       if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+               DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+               return -1;
+       }
+
+       ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_max_rsn, &max_rsn);
+       if (ret < 0) {
+               DEBUG(0,(__location__ " traverse failed in ctdb_control_max_rsn\n"));
+               return -1;
+       }
+
+       ctdb_lock_all_databases_unmark(ctdb);
+
+       outdata->dptr = (uint8_t *)talloc(outdata, uint64_t);
+       if (!outdata->dptr) {
+               return -1;
+       }
+       (*(uint64_t *)outdata->dptr) = max_rsn;
+       outdata->dsize = sizeof(uint64_t);
+
+       return 0;
+}
+
+
+/*
+  callback for ctdb_control_set_rsn_nonempty
+ */
+static int traverse_set_rsn_nonempty(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+       uint64_t *rsn = (uint64_t *)p;
+
+       if (data.dsize > sizeof(*h)) {
+               h->rsn = *rsn;
+               if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+/*
+  set rsn for all non-empty records in a database to a given rsn
+ */
+int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+       struct ctdb_control_set_rsn_nonempty *p = (struct ctdb_control_set_rsn_nonempty *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+
+       if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+               DEBUG(0,("rejecting ctdb_control_set_rsn_nonempty when not frozen\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, p->db_id);
+       if (!ctdb_db) {
+               DEBUG(0,(__location__ " Unknown db\n"));
+               return -1;
+       }
+
+       if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+               DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+               return -1;
+       }
+
+       ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_set_rsn_nonempty, &p->rsn);
+       if (ret < 0) {
+               DEBUG(0,(__location__ " traverse failed in ctdb_control_set_rsn_nonempty\n"));
+               return -1;
+       }
+
+       ctdb_lock_all_databases_unmark(ctdb);
+
+       return 0;
+}
+
+
+/*
+  callback for ctdb_control_delete_low_rsn
+ */
+static int traverse_delete_low_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
+       uint64_t *rsn = (uint64_t *)p;
+
+       if (data.dsize < sizeof(*h) || h->rsn < *rsn) {
+               if (tdb_delete(tdb, key) != 0) {
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+/*
+  delete any records with a rsn < the given rsn
+ */
+int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
+{
+       struct ctdb_control_delete_low_rsn *p = (struct ctdb_control_delete_low_rsn *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+
+       if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
+               DEBUG(0,("rejecting ctdb_control_delete_low_rsn when not frozen\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, p->db_id);
+       if (!ctdb_db) {
+               DEBUG(0,(__location__ " Unknown db\n"));
+               return -1;
+       }
+
+       if (ctdb_lock_all_databases_mark(ctdb) != 0) {
+               DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
+               return -1;
+       }
+
+       ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_delete_low_rsn, &p->rsn);
+       if (ret < 0) {
+               DEBUG(0,(__location__ " traverse failed in ctdb_control_delete_low_rsn\n"));
+               return -1;
+       }
+
+       ctdb_lock_all_databases_unmark(ctdb);
+
+       return 0;
+}
index 46e4099df3e82d4b6243cebc5def24b3f3b5b4f5..2c8029f69623fc6fe1a3fdf3f20cd41a04e4972f 100644 (file)
@@ -237,7 +237,8 @@ static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node
 
 
 
-static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                                          uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
 {
        int i, j, ret;
 
@@ -259,8 +260,70 @@ static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctd
        return 0;
 }
 
+/*
+  vacuum one database
+ */
+static int vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *nodemap)
+{
+       uint64_t max_rsn;
+       int ret, i;
+
+       /* find max rsn on our local node for this db */
+       ret = ctdb_ctrl_get_max_rsn(ctdb, timeval_current_ofs(1, 0), CTDB_CURRENT_NODE, db_id, &max_rsn);
+       if (ret != 0) {
+               return -1;
+       }
+
+       /* set rsn on non-empty records to max_rsn+1 */
+       for (i=0;i<nodemap->num;i++) {
+               if (!nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
+                       continue;
+               }
+               ret = ctdb_ctrl_set_rsn_nonempty(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[i].vnn,
+                                                db_id, max_rsn+1);
+               if (ret != 0) {
+                       DEBUG(0,(__location__ " Failed to set rsn on node %u to %llu\n",
+                                nodemap->nodes[i].vnn, max_rsn+1));
+                       return -1;
+               }
+       }
+
+       /* delete records with rsn < max_rsn+1 on all nodes */
+       for (i=0;i<nodemap->num;i++) {
+               if (!nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
+                       continue;
+               }
+               ret = ctdb_ctrl_delete_low_rsn(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[i].vnn,
+                                                db_id, max_rsn+1);
+               if (ret != 0) {
+                       DEBUG(0,(__location__ " Failed to delete records on node %u with rsn below %llu\n",
+                                nodemap->nodes[i].vnn, max_rsn+1));
+                       return -1;
+               }
+       }
+
 
-static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
+       return 0;
+}
+
+
+static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                               struct ctdb_dbid_map *dbmap)
+{
+       int i;
+
+       /* update dmaster to point to this node for all databases/nodes */
+       for (i=0;i<dbmap->num;i++) {
+               if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+
+static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                                   uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
 {
        int i, j, ret;
 
@@ -287,7 +350,8 @@ static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_
 }
 
 
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
+static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
+                                     uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
 {
        int j, ret;
 
@@ -464,6 +528,15 @@ static int do_recovery(struct ctdb_context *ctdb,
                return -1;
        }
 
+       /*
+         run a vacuum operation on empty records
+        */
+       ret = vacuum_all_databases(ctdb, nodemap, dbmap);
+       if (ret != 0) {
+               DEBUG(0, (__location__ " Unable to vacuum all databases\n"));
+               return -1;
+       }
+
 
        /* disable recovery mode */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
index dffd8cec07278214dedb859447b6be48a5f827a9..9d86330475030bf0d64e02dd3ab2fe3ae313065b 100644 (file)
@@ -375,8 +375,26 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
                    CTDB_CONTROL_SHUTDOWN,
                    CTDB_CONTROL_GET_MONMODE,
                    CTDB_CONTROL_SET_MONMODE,
+                   CTDB_CONTROL_MAX_RSN,
+                   CTDB_CONTROL_SET_RSN_NONEMPTY,
+                   CTDB_CONTROL_DELETE_LOW_RSN,
 };
 
+/*
+  structure passed in ctdb_control_set_rsn_nonempty
+ */
+struct ctdb_control_set_rsn_nonempty {
+       uint32_t db_id;
+       uint64_t rsn;
+};
+
+/*
+  structure passed in ctdb_control_delete_low_rsn
+ */
+struct ctdb_control_delete_low_rsn {
+       uint32_t db_id;
+       uint64_t rsn;
+};
 
 /*
   structure passed in set_call control
@@ -837,5 +855,14 @@ void ctdb_call_resend_all(struct ctdb_context *ctdb);
 void ctdb_node_dead(struct ctdb_node *node);
 void ctdb_node_connected(struct ctdb_node *node);
 bool ctdb_blocking_freeze(struct ctdb_context *ctdb);
+int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
+int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
+int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
+int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
+                         uint32_t destnode, uint32_t db_id, uint64_t *max_rsn);
+int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
+                              uint32_t destnode, uint32_t db_id, uint64_t rsn);
+int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
+                            uint32_t destnode, uint32_t db_id, uint64_t rsn);
 
 #endif