ctdb-daemon: Add tracking of migration records
[sfrench/samba-autobuild/.git] / ctdb / server / ctdb_ltdb_server.c
index 1affada5f0ff1c5fa7e2b637bfb3088fe789b013..677078be6a87e3d3b29b9b56c4be2b32e02d274c 100644 (file)
 
 #include "ctdb_private.h"
 #include "ctdb_client.h"
-#include "ctdb_logging.h"
 
 #include "common/rb_tree.h"
 #include "common/reqid.h"
 #include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
 
 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
@@ -56,7 +57,8 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA data)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-       TDB_DATA rec;
+       TDB_DATA rec[2];
+       uint32_t hsize = sizeof(struct ctdb_ltdb_header);
        int ret;
        bool seqnum_suppressed = false;
        bool keep = false;
@@ -65,14 +67,21 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
        uint32_t lmaster;
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               TDB_DATA old;
                struct ctdb_ltdb_header *h2;
-               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
-               h2 = (struct ctdb_ltdb_header *)rec.dptr;
-               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
-                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
-                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)old.dptr;
+               if (old.dptr != NULL &&
+                   old.dsize >= hsize &&
+                   h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_ERR,
+                             ("RSN regression! %"PRIu64" %"PRIu64"\n",
+                              h2->rsn, header->rsn));
+               }
+               if (old.dptr) {
+                       free(old.dptr);
                }
-               if (rec.dptr) free(rec.dptr);
        }
 
        if (ctdb->vnn_map == NULL) {
@@ -177,12 +186,11 @@ store:
         */
        header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
 
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = talloc_size(ctdb, rec.dsize);
-       CTDB_NO_MEMORY(ctdb, rec.dptr);
+       rec[0].dsize = hsize;
+       rec[0].dptr = (uint8_t *)header;
 
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+       rec[1].dsize = data.dsize;
+       rec[1].dptr = data.dptr;
 
        /* Databases with seqnum updates enabled only get their seqnum
           changes when/if we modify the data */
@@ -190,14 +198,14 @@ store:
                TDB_DATA old;
                old = tdb_fetch(ctdb_db->ltdb->tdb, key);
 
-               if ( (old.dsize == rec.dsize)
-               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+               if ((old.dsize == hsize + data.dsize) &&
+                   memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
                        tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
                        seqnum_suppressed = true;
                }
-               if (old.dptr) free(old.dptr);
+               if (old.dptr != NULL) {
+                       free(old.dptr);
+               }
        }
 
        DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
@@ -206,7 +214,7 @@ store:
                            ctdb_hash(&key)));
 
        if (keep) {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+               ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
        } else {
                ret = tdb_delete(ctdb_db->ltdb->tdb, key);
        }
@@ -233,8 +241,6 @@ store:
                tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
        }
 
-       talloc_free(rec.dptr);
-
        if (schedule_for_deletion) {
                int ret2;
                ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
@@ -629,7 +635,7 @@ int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
                                   ctdb_db->db_path,
                                   ctdb_db->unhealthy_reason));
        }
-       DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
+       DEBUG(DEBUG_NOTICE,
              ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
               ok, fail));
 
@@ -766,7 +772,6 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
 
-       ctdb_db->priority = 1;
        ctdb_db->ctdb = ctdb;
        ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
        CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
@@ -1022,6 +1027,15 @@ again:
                return -1;
        }
 
+       ret = ctdb_migration_init(ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to setup migration tracking for db '%s'\n",
+                      ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ctdb_db->generation = ctdb->vnn_map->generation;
 
        DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
@@ -1035,7 +1049,7 @@ again:
 struct ctdb_deferred_attach_context {
        struct ctdb_deferred_attach_context *next, *prev;
        struct ctdb_context *ctdb;
-       struct ctdb_req_control *c;
+       struct ctdb_req_control_old *c;
 };
 
 
@@ -1092,7 +1106,7 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                               TDB_DATA *outdata, uint64_t tdb_flags, 
                               bool persistent, uint32_t client_id,
-                              struct ctdb_req_control *c,
+                              struct ctdb_req_control_old *c,
                               bool *async_reply)
 {
        const char *db_name = (const char *)indata.dptr;
@@ -1107,7 +1121,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
-       /* dont allow any local clients to attach while we are in recovery mode
+       /* don't allow any local clients to attach while we are in recovery mode
         * except for the recovery daemon.
         * allow all attach from the network since these are always from remote
         * recovery daemons.
@@ -1571,40 +1585,6 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        return 0;
 }
 
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
-                                    uint32_t client_id)
-{
-       struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
-       struct ctdb_db_context *ctdb_db;
-
-       ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
-       if (!ctdb_db) {
-               if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
-                       DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
-                                        db_prio->db_id));
-               }
-               return 0;
-       }
-
-       if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
-               DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
-               return 0;
-       }
-
-       ctdb_db->priority = db_prio->priority;
-       DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
-
-       if (client_id != 0) {
-               /* Broadcast the update to the rest of the cluster */
-               ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
-                                        CTDB_CONTROL_SET_DB_PRIORITY, 0,
-                                        CTDB_CTRL_FLAG_NOREPLY, indata,
-                                        NULL, NULL);
-       }
-       return 0;
-}
-
-
 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
 {
        if (ctdb_db->sticky) {
@@ -1627,7 +1607,7 @@ int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_d
 
 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
 {
-       struct ctdb_db_statistics *s = &ctdb_db->statistics;
+       struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
        int i;
 
        for (i=0; i<MAX_HOT_KEYS; i++) {
@@ -1644,7 +1624,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                                TDB_DATA *outdata)
 {
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_db_statistics *stats;
+       struct ctdb_db_statistics_old *stats;
        int i;
        int len;
        char *ptr;
@@ -1655,7 +1635,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                return -1;
        }
 
-       len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
+       len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
        for (i = 0; i < MAX_HOT_KEYS; i++) {
                len += ctdb_db->statistics.hot_keys[i].key.dsize;
        }
@@ -1667,7 +1647,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
        }
 
        memcpy(stats, &ctdb_db->statistics,
-              offsetof(struct ctdb_db_statistics, hot_keys_wire));
+              offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
 
        stats->num_hot_keys = MAX_HOT_KEYS;