Merge branch 'master-readonly-records' into foo
[sahlberg/ctdb.git] / client / ctdb_client.c
index bcf9b2e89a928cb70bcbc271821ec624e258ee1d..89eeb4836a14bca81a422765b558854f1fb777e0 100644 (file)
@@ -187,7 +187,7 @@ static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req
 /*
   this is called in the client, when data comes in from the daemon
  */
-static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
+void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
 {
        struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
        struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
@@ -422,6 +422,10 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
 
        ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
 
+       if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+               ret = -1;
+       }
+
        if (ret == 0 && header.dmaster == ctdb->pnn) {
                state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
                talloc_free(data.dptr);
@@ -584,6 +588,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA
        return ctdb_call(ctdb_db, &call);
 }
 
+/*
+  try to fetch a readonly copy of a record
+ */
+static int
+ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
+{
+       int ret;
+
+       struct ctdb_call call;
+       ZERO_STRUCT(call);
+
+       call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
+       call.call_data.dptr = NULL;
+       call.call_data.dsize = 0;
+       call.key = key;
+       call.flags = CTDB_WANT_READONLY;
+       ret = ctdb_call(ctdb_db, &call);
+
+       if (ret != 0) {
+               return -1;
+       }
+       if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return -1;
+       }
+
+       *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
+       if (*hdr == NULL) {
+               talloc_free(call.reply_data.dptr);
+               return -1;
+       }
+
+       data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
+       data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
+       if (data->dptr == NULL) {
+               talloc_free(call.reply_data.dptr);
+               talloc_free(hdr);
+               return -1;
+       }
+
+       return 0;
+}
+
 /*
   get a lock on a record, and return the records data. Blocks until it gets the lock
  */
@@ -660,6 +706,185 @@ again:
        return h;
 }
 
+/*
+  get a readonly lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *
+ctdb_fetch_readonly_lock(
+       struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
+       TDB_DATA key, TDB_DATA *data,
+       int read_only)
+{
+       int ret;
+       struct ctdb_record_handle *h;
+       struct ctdb_ltdb_header *roheader = NULL;
+
+       h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+       if (h == NULL) {
+               return NULL;
+       }
+
+       h->ctdb_db = ctdb_db;
+       h->key     = key;
+       h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+       if (h->key.dptr == NULL) {
+               talloc_free(h);
+               return NULL;
+       }
+       h->data    = data;
+
+       data->dptr = NULL;
+       data->dsize = 0;
+
+
+again:
+       talloc_free(roheader);
+       roheader = NULL;
+
+       talloc_free(data->dptr);
+       data->dptr = NULL;
+       data->dsize = 0;
+
+       /* Lock the record/chain */
+       ret = ctdb_ltdb_lock(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       talloc_set_destructor(h, fetch_lock_destructor);
+
+       /* Check if record exists yet in the TDB */
+       ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+       if (ret != 0) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
+       /* if this is a request for read/write and we have delegations
+          we have to revoke all delegations first
+       */
+       if ((read_only == 0) 
+       &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
+       &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
+       /* if we are dmaster, just return the handle */
+       if (h->header.dmaster == ctdb_db->ctdb->pnn) {
+               return h;
+       }
+
+       if (read_only != 0) {
+               TDB_DATA rodata = {NULL, 0};
+
+               if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
+               ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+                       return h;
+               }
+
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               ret = ctdb_ltdb_lock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+
+               ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+               if (ret != 0) {
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (h->header.rsn >= roheader->rsn) {
+                       DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+
+               if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
+                       ctdb_ltdb_unlock(ctdb_db, key);
+
+                       ret = ctdb_client_force_migration(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                               talloc_free(h);
+                               return NULL;
+                       }
+
+                       goto again;
+               }
+               return h;
+       }
+
+       /* we are not dmaster and this was not a request for a readonly lock
+        * so unlock the record, migrate it and try again
+        */
+       ctdb_ltdb_unlock(ctdb_db, key);
+       ret = ctdb_client_force_migration(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
+               talloc_free(h);
+               return NULL;
+       }
+       goto again;
+}
+
 /*
   store some data to the record that was locked with ctdb_fetch_lock()
 */
@@ -685,6 +910,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
        call.call_id = CTDB_FETCH_FUNC;
        call.call_data.dptr = NULL;
        call.call_data.dsize = 0;
+       call.key = key;
 
        ret = ctdb_call(ctdb_db, &call);
 
@@ -1718,7 +1944,11 @@ static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
 /*
   attach to a specific database - client call
 */
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
+struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
+                                   struct timeval timeout,
+                                   const char *name,
+                                   bool persistent,
+                                   uint32_t tdb_flags)
 {
        struct ctdb_db_context *ctdb_db;
        TDB_DATA data;
@@ -1753,7 +1983,7 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
        ctdb_db->db_id = *(uint32_t *)data.dptr;
        talloc_free(data.dptr);
 
-       ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
+       ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
                talloc_free(ctdb_db);
@@ -4367,3 +4597,45 @@ ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ti
        return ctdb_ctrl_updaterecord_recv(ctdb, state);
 }
 
+
+
+
+
+
+/*
+  set a database to be readonly
+ */
+struct ctdb_client_control_state *
+ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+       TDB_DATA data;
+
+       data.dptr = (uint8_t *)&dbid;
+       data.dsize = sizeof(dbid);
+
+       return ctdb_control_send(ctdb, destnode, 0, 
+                          CTDB_CONTROL_SET_DB_READONLY, 0, data, 
+                          ctdb, NULL, NULL);
+}
+
+int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
+{
+       int ret;
+       int32_t res;
+
+       ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
+       if (ret != 0 || res != 0) {
+         DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
+               return -1;
+       }
+
+       return 0;
+}
+
+int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+       struct ctdb_client_control_state *state;
+
+       state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
+       return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
+}