dbwrap_ctdb: Pass on mutex flags to tdb_open
[obnox/samba/samba-obnox.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
index 7bc1e4766623666537aad51d85de0bfdf887d35c..e6dcc0e987c35908e854e97c7a35500f5752a9aa 100644 (file)
@@ -27,8 +27,6 @@
 #include "dbwrap/dbwrap_rbt.h"
 #include "lib/param/param.h"
 
-#ifdef CLUSTER_SUPPORT
-
 /*
  * It is not possible to include ctdb.h and tdb_compat.h (included via
  * some other include above) without warnings. This fixes those
@@ -73,6 +71,12 @@ struct db_ctdb_ctx {
        uint32_t db_id;
        struct db_ctdb_transaction_handle *transaction;
        struct g_lock_ctx *lock_ctx;
+
+       /* thresholds for warning messages */
+       int warn_unlock_msecs;
+       int warn_migrate_msecs;
+       int warn_migrate_attempts;
+       int warn_locktime_msecs;
 };
 
 struct db_ctdb_rec {
@@ -103,16 +107,7 @@ static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
        if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
                return -1;
        }
-       if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
-               /*
-                * Making this a separate case that needs fixing
-                * separately. This is an empty record. ctdbd does not
-                * distinguish between empty and deleted records. Samba right
-                * now can live without empty records, so lets treat zero-size
-                * (i.e. deleted) records as non-existing.
-                */
-               return -1;
-       }
+
        state->parser(
                key, (struct ctdb_ltdb_header *)data.dptr,
                make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
@@ -872,7 +867,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 
 
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 {
        NTSTATUS status;
@@ -916,7 +910,6 @@ static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
 
        return status;
 }
-#endif
 
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
@@ -932,10 +925,7 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec)
                return status;
        }
 
-#ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
        status = db_ctdb_send_schedule_for_deletion(rec);
-#endif
-
        return status;
 }
 
@@ -962,7 +952,7 @@ static int db_ctdb_record_destr(struct db_record* data)
        timediff = timeval_elapsed(&before);
        timediff *= 1000;       /* get us milliseconds */
 
-       if (timediff > lp_parm_int(-1, "ctdb", "unlock_warn_threshold", 5)) {
+       if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
                char *key;
                key = hex_encode_talloc(talloc_tos(),
                                        (unsigned char *)data->key.dptr,
@@ -978,18 +968,19 @@ static int db_ctdb_record_destr(struct db_record* data)
                return -1;
        }
 
-       threshold = lp_ctdb_locktime_warn_threshold();
+       threshold = crec->ctdb_ctx->warn_locktime_msecs;
        if (threshold != 0) {
-               timediff = timeval_elapsed(&crec->lock_time);
-               if ((timediff * 1000) > threshold) {
+               timediff = timeval_elapsed(&crec->lock_time) * 1000;
+               if (timediff > threshold) {
                        const char *key;
 
                        key = hex_encode_talloc(data,
                                                (unsigned char *)data->key.dptr,
                                                data->key.dsize);
-                       DEBUG(0, ("Held tdb lock on db %s, key %s %f seconds\n",
-                                 tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
-                                 timediff));
+                       DEBUG(0, ("Held tdb lock on db %s, key %s "
+                                 "%f milliseconds\n",
+                                 tdb_name(crec->ctdb_ctx->wtdb->tdb),
+                                 key, timediff));
                }
        }
 
@@ -1003,7 +994,6 @@ static int db_ctdb_record_destr(struct db_record* data)
 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
                                      bool read_only)
 {
-#ifdef HAVE_CTDB_WANT_READONLY_DECL
        if (hdr->dmaster != get_my_vnn()) {
                /* If we're not dmaster, it must be r/o copy. */
                return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
@@ -1013,9 +1003,6 @@ static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
         * If we want write access, no one may have r/o copies.
         */
        return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
-#else
-       return (hdr->dmaster == get_my_vnn());
-#endif
 }
 
 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
@@ -1161,17 +1148,26 @@ again:
                duration_msecs = duration * 1000;
        }
 
-       if ((migrate_attempts > lp_parm_int(-1, "ctdb", "migrate_attempts", 10)) ||
-           (duration_msecs > lp_parm_int(-1, "ctdb", "migrate_duration", 5000))) {
-               DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
-                         "attempts, %d milliseconds, chainlock: %d ms, "
-                         "CTDB %d ms\n", tdb_name(ctx->wtdb->tdb),
+       if ((migrate_attempts > ctx->warn_migrate_attempts) ||
+           (duration_msecs > ctx->warn_migrate_msecs)) {
+               int chain = 0;
+
+               if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
+                       chain = tdb_jenkins_hash(&key) %
+                               tdb_hash_size(ctx->wtdb->tdb);
+               }
+
+               DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
+                         "needed %d attempts, %d milliseconds, "
+                         "chainlock: %f ms, CTDB %f ms\n",
+                         tdb_name(ctx->wtdb->tdb),
                          hex_encode_talloc(talloc_tos(),
                                            (unsigned char *)key.dptr,
                                            key.dsize),
+                         chain,
                          migrate_attempts, duration_msecs,
-                         (int) chainlock_time * 1000,
-                         (int) ctdb_time * 1000));
+                         chainlock_time * 1000.0,
+                         ctdb_time * 1000.0));
        }
 
        GetTimeOfDay(&crec->lock_time);
@@ -1553,7 +1549,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                                const char *name,
                                int hash_size, int tdb_flags,
                                int open_flags, mode_t mode,
-                               enum dbwrap_lock_order lock_order)
+                               enum dbwrap_lock_order lock_order,
+                               uint64_t dbwrap_flags)
 {
        struct db_context *result;
        struct db_ctdb_ctx *db_ctdb;
@@ -1610,7 +1607,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->lock_order = lock_order;
 
        /* only pass through specific flags */
-       tdb_flags &= TDB_SEQNUM;
+       tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
+               TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
 
        /* honor permissions if user has specified O_CREAT */
        if (open_flags & O_CREAT) {
@@ -1632,10 +1630,34 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
+       if (!result->persistent &&
+           (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
+       {
+               TDB_DATA indata;
+
+               indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
+                                      sizeof(db_ctdb->db_id));
+
+               status = ctdbd_control_local(
+                       conn, CTDB_CONTROL_SET_DB_READONLY, 0, 0, indata,
+                       NULL, NULL, &cstatus);
+               if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
+                       DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
+                                 "%s, %d\n", nt_errstr(status), cstatus));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
        lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
 
-       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags,
-                                     O_RDWR, 0, lp_ctx);
+       if (hash_size == 0) {
+               hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
+       }
+
+       db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
+                                     lpcfg_tdb_flags(lp_ctx, tdb_flags),
+                                     O_RDWR, 0);
        talloc_unlink(db_path, lp_ctx);
        if (db_ctdb->wtdb == NULL) {
                DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
@@ -1654,6 +1676,14 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                }
        }
 
+       db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
+                                                "unlock_warn_threshold", 5);
+       db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
+                                                    "migrate_attempts", 10);
+       db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
+                                                 "migrate_duration", 5000);
+       db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
+
        result->private_data = (void *)db_ctdb;
        result->fetch_locked = db_ctdb_fetch_locked;
        result->try_fetch_locked = db_ctdb_try_fetch_locked;
@@ -1672,18 +1702,3 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
 
        return result;
 }
-
-#else /* CLUSTER_SUPPORT */
-
-struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
-                               const char *name,
-                               int hash_size, int tdb_flags,
-                               int open_flags, mode_t mode,
-                               enum dbwrap_lock_order lock_order)
-{
-       DEBUG(3, ("db_open_ctdb: no cluster support!\n"));
-       errno = ENOSYS;
-       return NULL;
-}
-
-#endif