more efficient traversal in pulldb control
authorAndrew Tridgell <tridge@samba.org>
Mon, 7 Jan 2008 03:07:01 +0000 (14:07 +1100)
committerAndrew Tridgell <tridge@samba.org>
Mon, 7 Jan 2008 03:07:01 +0000 (14:07 +1100)
(This used to be ctdb commit fe614b10868e63b70e081b5bbfb74bf16fdf5716)

ctdb/server/ctdb_recover.c

index 4c6163b035a3d6b2c936f15d4760a6839b8c89af..212a1ada7a8454eb69a3f02f4d57e51025c45786 100644 (file)
@@ -170,36 +170,38 @@ ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA ind
        return 0;
 }
 
-struct getkeys_params {
+/* 
+   a traverse function for pulling all relevent records from pulldb
+ */
+struct pulldb_data {
        struct ctdb_context *ctdb;
-       uint32_t lmaster;
-       uint32_t rec_count;
-       struct getkeys_rec {
-               TDB_DATA key;
-               TDB_DATA data;
-       } *recs;
+       struct ctdb_control_pulldb_reply *pulldata;
+       uint32_t len;
+       bool failed;
 };
 
-static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
 {
-       struct getkeys_params *params = (struct getkeys_params *)p;
-       uint32_t lmaster;
-
-       lmaster = ctdb_lmaster(params->ctdb, &key);
+       struct pulldb_data *params = (struct pulldb_data *)p;
+       struct ctdb_rec_data *rec;
 
-       /* only include this record if the lmaster matches or if
-          the wildcard lmaster (-1) was specified.
-       */
-       if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) {
-               return 0;
+       /* add the record to the blob */
+       rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
+       if (rec == NULL) {
+               params->failed = true;
+               return -1;
        }
-
-       params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1);
-       key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize);
-       data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize);
-       params->recs[params->rec_count].key = key;
-       params->recs[params->rec_count].data = data;
-       params->rec_count++;
+       params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
+       if (params->pulldata == NULL) {
+               DEBUG(0,(__location__ " Failed to expand pulldb_data to %u (%u records)\n", 
+                        rec->length + params->len, params->pulldata->count));
+               params->failed = true;
+               return -1;
+       }
+       params->pulldata->count++;
+       memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
+       params->len += rec->length;
+       talloc_free(rec);
 
        return 0;
 }
@@ -211,10 +213,8 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT
 {
        struct ctdb_control_pulldb *pull;
        struct ctdb_db_context *ctdb_db;
-       struct getkeys_params params;
+       struct pulldb_data params;
        struct ctdb_control_pulldb_reply *reply;
-       int i;
-       size_t len = 0;
 
        if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
                DEBUG(0,("rejecting ctdb_control_pull_db when not frozen\n"));
@@ -229,43 +229,32 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT
                return -1;
        }
 
-       params.ctdb = ctdb;
-       params.lmaster = pull->lmaster;
+       reply = talloc_zero(outdata, struct ctdb_control_pulldb_reply);
+       CTDB_NO_MEMORY(ctdb, reply);
 
-       params.rec_count = 0;
-       params.recs = talloc_array(outdata, struct getkeys_rec, 0);
-       CTDB_NO_MEMORY(ctdb, params.recs);
+       reply->db_id = pull->db_id;
+
+       params.ctdb = ctdb;
+       params.pulldata = reply;
+       params.len = offsetof(struct ctdb_control_pulldb_reply, data);
+       params.failed = false;
 
        if (ctdb_lock_all_databases_mark(ctdb) != 0) {
                DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
                return -1;
        }
 
-       tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
-
-       ctdb_lock_all_databases_unmark(ctdb);
-
-       reply = talloc(outdata, struct ctdb_control_pulldb_reply);
-       CTDB_NO_MEMORY(ctdb, reply);
-
-       reply->db_id = pull->db_id;
-       reply->count = params.rec_count;
-
-       len = offsetof(struct ctdb_control_pulldb_reply, data);
-
-       for (i=0;i<reply->count;i++) {
-               struct ctdb_rec_data *rec;
-               rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, NULL, params.recs[i].data);
-               reply = talloc_realloc_size(outdata, reply, rec->length + len);
-               memcpy(len+(uint8_t *)reply, rec, rec->length);
-               len += rec->length;
-               talloc_free(rec);
+       if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
+               DEBUG(0,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
+               ctdb_lock_all_databases_unmark(ctdb);
+               talloc_free(params.pulldata);
+               return -1;
        }
 
-       talloc_free(params.recs);
+       ctdb_lock_all_databases_unmark(ctdb);
 
-       outdata->dptr = (uint8_t *)reply;
-       outdata->dsize = len;
+       outdata->dptr = (uint8_t *)params.pulldata;
+       outdata->dsize = params.len;
 
        return 0;
 }