ctdb-daemon: Add tracking of migration records
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
index 8feaff11c98a3c4846c622e6cf2c47d7ad80a078..677078be6a87e3d3b29b9b56c4be2b32e02d274c 100644 (file)
@@ -57,7 +57,8 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA data)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-       TDB_DATA rec;
+       TDB_DATA rec[2];
+       uint32_t hsize = sizeof(struct ctdb_ltdb_header);
        int ret;
        bool seqnum_suppressed = false;
        bool keep = false;
@@ -66,14 +67,21 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
        uint32_t lmaster;
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               TDB_DATA old;
                struct ctdb_ltdb_header *h2;
-               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
-               h2 = (struct ctdb_ltdb_header *)rec.dptr;
-               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
-                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
-                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)old.dptr;
+               if (old.dptr != NULL &&
+                   old.dsize >= hsize &&
+                   h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_ERR,
+                             ("RSN regression! %"PRIu64" %"PRIu64"\n",
+                              h2->rsn, header->rsn));
+               }
+               if (old.dptr) {
+                       free(old.dptr);
                }
-               if (rec.dptr) free(rec.dptr);
        }
 
        if (ctdb->vnn_map == NULL) {
@@ -178,12 +186,11 @@ store:
         */
        header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
 
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = talloc_size(ctdb, rec.dsize);
-       CTDB_NO_MEMORY(ctdb, rec.dptr);
+       rec[0].dsize = hsize;
+       rec[0].dptr = (uint8_t *)header;
 
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+       rec[1].dsize = data.dsize;
+       rec[1].dptr = data.dptr;
 
        /* Databases with seqnum updates enabled only get their seqnum
           changes when/if we modify the data */
@@ -191,14 +198,14 @@ store:
                TDB_DATA old;
                old = tdb_fetch(ctdb_db->ltdb->tdb, key);
 
-               if ( (old.dsize == rec.dsize)
-               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+               if ((old.dsize == hsize + data.dsize) &&
+                   memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
                        tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
                        seqnum_suppressed = true;
                }
-               if (old.dptr) free(old.dptr);
+               if (old.dptr != NULL) {
+                       free(old.dptr);
+               }
        }
 
        DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
@@ -207,7 +214,7 @@ store:
                            ctdb_hash(&key)));
 
        if (keep) {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+               ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
        } else {
                ret = tdb_delete(ctdb_db->ltdb->tdb, key);
        }
@@ -234,8 +241,6 @@ store:
                tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
        }
 
-       talloc_free(rec.dptr);
-
        if (schedule_for_deletion) {
                int ret2;
                ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
@@ -1022,6 +1027,15 @@ again:
                return -1;
        }
 
+       ret = ctdb_migration_init(ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to setup migration tracking for db '%s'\n",
+                      ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ctdb_db->generation = ctdb->vnn_map->generation;
 
        DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",