dbwrap ctdb: add a partial mapping from tdb_error to NTSTATUS and use it for store.
[jra/samba/.git] / source3 / lib / dbwrap_ctdb.c
index aa2f9d371ca36c0437c4f0651a3f76c29c361e10..69b3631c04d9fb9876a758879f963e2c997a1ea7 100644 (file)
@@ -33,6 +33,31 @@ struct db_ctdb_rec {
        struct ctdb_ltdb_header header;
 };
 
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+                                              TALLOC_CTX *mem_ctx,
+                                              TDB_DATA key,
+                                              bool persistent);
+
+static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
+{
+       NTSTATUS status;
+       enum TDB_ERROR tret = tdb_error(tdb);
+
+       switch (tret) {
+       case TDB_ERR_EXISTS:
+               status = NT_STATUS_OBJECT_NAME_COLLISION;
+               break;
+       case TDB_ERR_NOEXIST:
+               status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
+               break;
+       default:
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               break;
+       }
+
+       return status;
+}
+
 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 {
        struct db_ctdb_rec *crec = talloc_get_type_abort(
@@ -53,7 +78,8 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 
        SAFE_FREE(cdata.dptr);
 
-       return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
+       return (ret == 0) ? NT_STATUS_OK
+                         : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
 }
 
 
@@ -62,36 +88,118 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
    store */
 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
 {
-       struct db_ctdb_rec *crec = talloc_get_type_abort(
-               rec->private_data, struct db_ctdb_rec);
+       struct db_ctdb_rec *crec;
+       struct db_record *record;
        TDB_DATA cdata;
        int ret;
        NTSTATUS status;
+       uint32_t count;
+       int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
+
+       for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
+            (count < max_retries) && !NT_STATUS_IS_OK(status);
+            count++)
+       {
+               if (count > 0) {
+                       /* retry */
+                       /*
+                        * There is a hack here: We use rec as a memory
+                        * context and re-use it as the record struct ptr.
+                        * We don't free the record data allocated
+                        * in each turn. So all gets freed when the caller
+                        * releases the original record. This is because
+                        * we don't get the record passed in by reference
+                        * in the first place and the caller relies on
+                        * having to free the record himself.
+                        */
+                       record = fetch_locked_internal(crec->ctdb_ctx,
+                                                      rec,
+                                                      rec->key,
+                                                      true /* persistent */);
+                       if (record == NULL) {
+                               DEBUG(5, ("fetch_locked_internal failed.\n"));
+                               status = NT_STATUS_NO_MEMORY;
+                               break;
+                       }
+               }
 
-       cdata.dsize = sizeof(crec->header) + data.dsize;
+               crec = talloc_get_type_abort(record->private_data,
+                                            struct db_ctdb_rec);
 
-       if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
-               return NT_STATUS_NO_MEMORY;
-       }
+               cdata.dsize = sizeof(crec->header) + data.dsize;
 
-       crec->header.rsn++;
+               if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
+                       return NT_STATUS_NO_MEMORY;
+               }
 
-       memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
-       memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
+               crec->header.rsn++;
 
-       status = ctdbd_start_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
-       
-       /* now tell ctdbd to update this record on all other nodes */
-       if (NT_STATUS_IS_OK(status)) {
-               ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
-               status = (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
-       }
+               memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
+               memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
 
-       /* now tell ctdbd to update this record on all other nodes */
-       if (NT_STATUS_IS_OK(status)) {
-               status = ctdbd_persistent_store(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
-       } else {
-               ctdbd_cancel_persistent_update(messaging_ctdbd_connection(), crec->ctdb_ctx->db_id, rec->key, cdata);
+               status = ctdbd_start_persistent_update(
+                               messaging_ctdbd_connection(),
+                               crec->ctdb_ctx->db_id,
+                               rec->key,
+                               cdata);
+
+               if (NT_STATUS_IS_OK(status)) {
+                       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
+                                       cdata, TDB_REPLACE);
+                       status = (ret == 0)
+                                       ? NT_STATUS_OK
+                                       : tdb_error_to_ntstatus(
+                                               crec->ctdb_ctx->wtdb->tdb);
+               }
+
+               /*
+                * release the lock *now* in order to prevent deadlocks.
+                *
+                * There is a tradeoff: Usually, the record is still locked
+                * after db->store operation. This lock is usually released
+                * via the talloc destructor with the TALLOC_FREE to
+                * the record. So we have two choices:
+                *
+                * - Either re-lock the record after the call to persistent_store
+                *   or cancel_persistent update and this way not changing any
+                *   assumptions callers may have about the state, but possibly
+                *   introducing new race conditions.
+                *
+                * - Or don't lock the record again but just remove the
+                *   talloc_destructor. This is less racy but assumes that
+                *   the lock is always released via TALLOC_FREE of the record.
+                *
+                * I choose the first variant for now since it seems less racy.
+                * We can't guarantee that we succeed in getting the lock
+                * anyways. The only real danger here is that a caller
+                * performs multiple store operations after a fetch_locked()
+                * which is currently not the case.
+                */
+               tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
+               talloc_set_destructor(record, NULL);
+
+               /* now tell ctdbd to update this record on all other nodes */
+               if (NT_STATUS_IS_OK(status)) {
+                       status = ctdbd_persistent_store(
+                                       messaging_ctdbd_connection(),
+                                       crec->ctdb_ctx->db_id,
+                                       rec->key,
+                                       cdata);
+               } else {
+                       ctdbd_cancel_persistent_update(
+                                       messaging_ctdbd_connection(),
+                                       crec->ctdb_ctx->db_id,
+                                       rec->key,
+                                       cdata);
+               }
+
+               SAFE_FREE(cdata.dptr);
+       } /* retry-loop */
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(5, ("ctdbd_persistent_store still failed after "
+                         "%d retries with error %s - giving up.\n",
+                         count, nt_errstr(status)));
        }
 
        SAFE_FREE(cdata.dptr);
@@ -101,22 +209,32 @@ static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, i
 
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
-       struct db_ctdb_rec *crec = talloc_get_type_abort(
-               rec->private_data, struct db_ctdb_rec);
        TDB_DATA data;
-       int ret;
 
        /*
         * We have to store the header with empty data. TODO: Fix the
         * tdb-level cleanup
         */
 
-       data.dptr = (uint8 *)&crec->header;
-       data.dsize = sizeof(crec->header);
+       ZERO_STRUCT(data);
 
-       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, data, TDB_REPLACE);
+       return db_ctdb_store(rec, data, 0);
+
+}
+
+static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
+{
+       TDB_DATA data;
+
+       /*
+        * We have to store the header with empty data. TODO: Fix the
+        * tdb-level cleanup
+        */
+
+       ZERO_STRUCT(data);
+
+       return db_ctdb_store_persistent(rec, data, 0);
 
-       return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
 }
 
 static int db_ctdb_record_destr(struct db_record* data)
@@ -139,12 +257,11 @@ static int db_ctdb_record_destr(struct db_record* data)
        return 0;
 }
 
-static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
-                                             TALLOC_CTX *mem_ctx,
-                                             TDB_DATA key)
+static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
+                                              TALLOC_CTX *mem_ctx,
+                                              TDB_DATA key,
+                                              bool persistent)
 {
-       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
-                                                       struct db_ctdb_ctx);
        struct db_record *result;
        struct db_ctdb_rec *crec;
        NTSTATUS status;
@@ -193,12 +310,13 @@ again:
                return NULL;
        }
 
-       if (db->persistent) {
+       if (persistent) {
                result->store = db_ctdb_store_persistent;
+               result->delete_rec = db_ctdb_delete_persistent;
        } else {
                result->store = db_ctdb_store;
+               result->delete_rec = db_ctdb_delete;
        }
-       result->delete_rec = db_ctdb_delete;
        talloc_set_destructor(result, db_ctdb_record_destr);
 
        ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
@@ -260,6 +378,16 @@ again:
        return result;
 }
 
+static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
+                                             TALLOC_CTX *mem_ctx,
+                                             TDB_DATA key)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ctdb_ctx);
+
+       return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
+}
+
 /*
   fetch (unlocked, no migration) operation on ctdb
  */