ReadOnly: Add clientside code to fetch readonly records
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Wed, 20 Jul 2011 05:37:37 +0000 (15:37 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Tue, 23 Aug 2011 00:34:15 +0000 (10:34 +1000)
(This used to be ctdb commit 6fccc902bce21fa6ff13ed08ee3341bbf8be39f2)

ctdb/client/ctdb_client.c
ctdb/include/ctdb_client.h
ctdb/include/ctdb_private.h

index bcf9b2e89a928cb70bcbc271821ec624e258ee1d..20b4a74842a2d53196b7c9b14dc06629c1f902ed 100644 (file)
@@ -584,6 +584,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA
        return ctdb_call(ctdb_db, &call);
 }
 
+/*
+  try to fetch a readonly copy of a record
+ */
+static int
+ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
+{
+       int ret;
+
+       struct ctdb_call call;
+       ZERO_STRUCT(call);
+
+       call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
+       call.call_data.dptr = NULL;
+       call.call_data.dsize = 0;
+       call.key = key;
+       call.flags = CTDB_WANT_READONLY;
+       ret = ctdb_call(ctdb_db, &call);
+
+       if (ret != 0) {
+               return -1;
+       }
+       if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return -1;
+       }
+
+       *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
+       if (*hdr == NULL) {
+               talloc_free(call.reply_data.dptr);
+               return -1;
+       }
+
+       data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
+       data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
+       if (data->dptr == NULL) {
+               talloc_free(call.reply_data.dptr);
+               talloc_free(hdr);
+               return -1;
+       }
+
+       return 0;
+}
+
 /*
   get a lock on a record, and return the records data. Blocks until it gets the lock
  */
@@ -660,6 +702,170 @@ again:
        return h;
 }
 
+/*
+  get a readonly lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *
+ctdb_fetch_readonly_lock(
+       struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
+       TDB_DATA key, TDB_DATA *data,
+       int read_only)
+{
+       int ret;
+       struct ctdb_record_handle *h;
+       struct ctdb_ltdb_header *roheader = NULL;
+
+       h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+       if (h == NULL) {
+               return NULL;
+       }
+
+       h->ctdb_db = ctdb_db;
+       h->key     = key;
+       h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+       if (h->key.dptr == NULL) {
+               talloc_free(h);
+               return NULL;
+       }
+       h->data    = data;
+
+       data->dptr = NULL;
+       data->dsize = 0;
+
+
+again:
+       talloc_free(roheader);
+       roheader = NULL;
+
+       talloc_free(data->dptr);
+       data->dptr = NULL;
+       data->dsize = 0;
+
+       /* Lock the record/chain */
+       ret = ctdb_ltdb_lock(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       talloc_set_destructor(h, fetch_lock_destructor);
+
+       /* Check if record exists yet in the TDB */
+       ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+       if (ret != 0) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
+
+       /* if we are dmaster, just return the handle */
+       if (h->header.dmaster == ctdb_db->ctdb->pnn) {
+               return h;
+       }
+
+       if (read_only != 0) {
+               TDB_DATA rodata = {NULL, 0};
+
+               if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
+               ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+                       return h;
+               }
+
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               ret = ctdb_ltdb_lock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+
+               ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+               if (ret != 0) {
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (h->header.rsn >= roheader->rsn) {
+                       DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+               return h;
+       }
+
+       /* we are not dmaster and this was not a request for a readonly lock
+        * so unlock the record, migrate it and try again
+        */
+       ctdb_ltdb_unlock(ctdb_db, key);
+       ret = ctdb_client_force_migration(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
+               talloc_free(h);
+               return NULL;
+       }
+       goto again;
+}
+
 /*
   store some data to the record that was locked with ctdb_fetch_lock()
 */
@@ -685,6 +891,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
        call.call_id = CTDB_FETCH_FUNC;
        call.call_data.dptr = NULL;
        call.call_data.dsize = 0;
+       call.key = key;
 
        ret = ctdb_call(ctdb_db, &call);
 
index 0699bf1cfa426f867b5f1b686eda8145e06c2f64..720f0733611ac77a24bb324be99fcc7b66fe7c6c 100644 (file)
@@ -174,6 +174,8 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
                                           TDB_DATA key, TDB_DATA *data);
 
+struct ctdb_record_handle *ctdb_fetch_readonly_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data, int read_only);
+
 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data);
 
 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
index 4937bc8762ffb5bd0a61c765c336c4a0823770a2..4cf3709fa3843b2f4a760b23c9b4d6877d13eb03 100644 (file)
@@ -669,9 +669,9 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
                    struct ctdb_ltdb_header *header, TDB_DATA data);
 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key);
-int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db,
-                  TDB_DATA key, struct ctdb_ltdb_header *header,
-                  TALLOC_CTX *mem_ctx, TDB_DATA *data);
+int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db, 
+                   TDB_DATA key, struct ctdb_ltdb_header *header, 
+                   TALLOC_CTX *mem_ctx, TDB_DATA *data);
 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, 
                        struct ctdb_req_control *c,
                        TDB_DATA recdata);