server: implement a new control SCHEDULE_FOR_DELETION to fill the delete_queue.
[sahlberg/ctdb.git] / server / ctdb_vacuum.c
index 94d3416bf3b763cda0e113f794278113b0cf973b..ec3b5f29f2815b2de0786f86a995e59e5e57ee3d 100644 (file)
@@ -1269,3 +1269,67 @@ int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 
        return 0;
 }
+
+/**
+ * Schedule a record for deletetion.
+ * Called from the parent context.
+ */
+int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
+                                          TDB_DATA indata)
+{
+       struct ctdb_control_schedule_for_deletion *dd;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+       TDB_DATA key;
+       uint32_t hash;
+       struct delete_record_data *kd;
+
+       dd = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
+
+       ctdb_db = find_ctdb_db(ctdb, dd->db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Unknown db id 0x%08x\n",
+                                 dd->db_id));
+               return -1;
+       }
+
+       key.dsize = dd->keylen;
+       key.dptr = dd->key;
+
+       hash = (uint32_t)ctdb_hash(&key);
+
+       DEBUG(DEBUG_INFO, (__location__ " Schedule for deletion: db[%s] "
+                          "db_id[0x%08x] "
+                          "key_hash[0x%08x] "
+                          "lmaster[%u] "
+                          "migrated_with_data[%s]\n",
+                           ctdb_db->db_name, dd->db_id,
+                           hash,
+                           ctdb_lmaster(ctdb_db->ctdb, &key),
+                           dd->hdr.flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
+
+       kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
+       if (kd != NULL) {
+               if ((kd->key.dsize != key.dsize) ||
+                   (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
+               {
+                       DEBUG(DEBUG_INFO,
+                             ("schedule for deletion: Hash collision (0x%08x)."
+                              " Skipping the record.\n", hash));
+                       return 0;
+               } else {
+                       DEBUG(DEBUG_INFO,
+                             ("schedule for deletetion: Overwriting entry for "
+                              "key with hash 0x%08x.\n", hash));
+               }
+       }
+
+       ret = insert_delete_record_data_into_tree(ctdb, ctdb_db,
+                                                 ctdb_db->delete_queue,
+                                                 &dd->hdr, key);
+       if (ret != 0) {
+               return -1;
+       }
+
+       return 0;
+}