WIP s3/dbwrap_ceph: add fetch_locked handler wip_hw2018_dbwrap_ceph
authorDavid Disseldorp <ddiss@samba.org>
Mon, 16 Jul 2018 09:07:42 +0000 (11:07 +0200)
committerDavid Disseldorp <ddiss@samba.org>
Mon, 16 Jul 2018 09:12:20 +0000 (11:12 +0200)
b64 encode the record key to use as a RADOS lock name and then fetch the
KV-pair via rados_read_op_omap_get_vals_by_keys2().

WIP:
- split into sub-functions
- spawn thread on open and handle lock-wait via watch/notify event-cb
- add rec store / delete hooks
- see TODOs/FIXMEs in code

source3/lib/dbwrap/dbwrap_ceph.c

index a4d3ce7ff9bc465321f76f81e36d1f25c7099a62..4e19268e64585cec6e381409a501ff37cdcae503 100644 (file)
@@ -16,6 +16,7 @@
  */
 
 #include "includes.h"
+#include "lib/util/base64.h"
 #include "dbwrap/dbwrap.h"
 #include "dbwrap/dbwrap_private.h"
 #include "dbwrap/dbwrap_ceph.h"
@@ -26,11 +27,18 @@ struct db_ceph_ctx {
        struct tdb_wrap *wtdb;
        uint32_t db_id;
        rados_t cluster;
+       rados_ioctx_t rados_ioctx;
 
        /* TODO transaction state */
        /* TODO struct g_lock_ctx *lock_ctx; */
 };
 
+struct db_ceph_rec {
+       struct db_ceph_ctx *ctx;
+       char *lockname; /* b64 encoded key */
+       bool locked;
+};
+
 /* XXX copied from rados_svc_register() */
 static int db_ceph_cluster_init(rados_t *_cluster)
 {
@@ -86,10 +94,214 @@ err_out:
 static int db_ceph_destructor(struct db_ceph_ctx *ctx)
 {
        DBG_DEBUG("shutting down db Ceph cluster connection\n");
+       rados_ioctx_destroy(ctx->rados_ioctx);
        rados_shutdown(ctx->cluster);
        return 0;
 }
 
+#define DB_CEPH_LOCK_COOKIE "db_ceph_cookie"
+#define DB_CEPH_LOCK_DESC ""
+
+static int db_ceph_record_destructor(struct db_record *rec)
+{
+       struct db_ceph_rec *ceph_rec = rec->private_data;
+       struct db_ceph_ctx *ctx = rec->db->private_data;
+
+       if (ceph_rec->locked) {
+               int ret;
+               DBG_INFO("unlocking %s->%s\n",
+                        rec->db->name, ceph_rec->lockname);
+               ret = rados_unlock(ctx->rados_ioctx, rec->db->name,
+                                  ceph_rec->lockname, DB_CEPH_LOCK_COOKIE);
+               if (ret < 0) {
+                       DBG_ERR("unlock %s->%s failed\n",
+                               rec->db->name, ceph_rec->lockname);
+                       return ret;     /* FIXME: check callers */
+               }
+       }
+
+       return 0;
+}
+
+static struct db_record *db_ceph_fetch_locked_internal(struct db_ceph_ctx *ctx,
+                                                      TALLOC_CTX *mem_ctx,
+                                                      TDB_DATA key,
+                                                      bool try)
+{
+       int ret;
+       struct db_record *rec;
+       struct db_ceph_rec *ceph_rec;
+       DATA_BLOB key_blob = {
+               .data = key.dptr,
+               .length = key.dsize,
+       };
+       rados_read_op_t read_op;
+       rados_omap_iter_t iter;
+       const char *keys;
+       int op_ret;
+       char *rkey;
+       size_t rkeylen;
+       char *rval;
+       size_t rvallen;
+
+       rec = talloc_zero(mem_ctx, struct db_record);
+       if (rec == NULL) {
+               return NULL;
+       }
+       talloc_set_destructor(rec, db_ceph_record_destructor);
+
+       ceph_rec = talloc_zero(rec, struct db_ceph_rec);
+       if (ceph_rec == NULL) {
+               talloc_free(rec);
+               return NULL;
+       }
+
+       rec->db = ctx->db;
+       rec->storev = NULL; /* TODO */
+       rec->delete_rec = NULL; /* TODO */
+       rec->private_data = ceph_rec;
+       ceph_rec->ctx = ctx;
+
+       rec->key.dsize = key.dsize;
+       rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+       if (rec->key.dptr == NULL) {
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       /* XXX rec->key set later via get_vals_by_keys2 response iterator */
+
+       /*
+        * rados requires string for lockname, so encode as b64 for now.
+        * TODO use hash buckets for locks instead?
+        */
+       ceph_rec->lockname = base64_encode_data_blob(ceph_rec, key_blob);
+       if (ceph_rec->lockname == NULL) {
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+again:
+       DBG_INFO("locking %s->%s\n", rec->db->name, ceph_rec->lockname);
+       ret = rados_lock_exclusive(ctx->rados_ioctx, rec->db->name,
+                                   ceph_rec->lockname,
+                                  DB_CEPH_LOCK_COOKIE,
+                                  DB_CEPH_LOCK_DESC,
+                                   NULL, /* infinite duration */
+                                   0);
+       if ((ret == -EEXIST) || (ret == -EBUSY)) {
+               /* lock contention */
+               DBG_WARNING("'%s' lock contention on RADOS object '%s'\n",
+                       ceph_rec->lockname, rec->db->name);
+               if (try) {
+                       TALLOC_FREE(rec);
+                       return NULL;
+               }
+               /*
+                * FIXME this should be event based!
+                * TODO spawn watch / notify thread on open?
+                */
+               usleep(100 * 1000);
+               goto again;
+       } else if (ret < 0) {
+               /* unexpected failure */
+               DBG_ERR("failed to get '%s' lock on RADOS object '%s' - (%s)\n",
+                       ceph_rec->lockname, rec->db->name, strerror(-ret));
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       ceph_rec->locked = true;
+       DBG_DEBUG("'%s' lock obtained on RADOS object '%s'\n",
+                 ceph_rec->lockname, rec->db->name);
+
+       /* TODO combine lock and fetch as a compound OSD operation */
+
+       read_op = rados_create_read_op();
+       if (read_op == NULL) {
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       op_ret = 0;
+       keys = (char *)key.dptr;
+       rados_read_op_omap_get_vals_by_keys2(read_op,
+                                            &keys,
+                                            1,         /* num_keys */
+                                            &key.dsize,
+                                            &iter,
+                                            &op_ret);
+
+       ret = rados_read_op_operate(read_op,
+                                   ctx->rados_ioctx,
+                                    rec->db->name,
+                                    LIBRADOS_OPERATION_NOFLAG);
+       rados_release_read_op(read_op);
+       if (ret < 0) {
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       if (op_ret < 0) {
+               /* FIXME: could also get ENOENT here if key is missing? */
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       ret = rados_omap_get_next2(iter, &rkey, &rval, &rkeylen, &rvallen);
+       if (ret < 0) {
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       if ((rkey == NULL) ||
+           (key.dsize != rkeylen)) {
+               DBG_ERR("Null or non-matching key sizes\n");
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       if (memcmp(key.dptr, rkey, key.dsize) != 0) {
+               DBG_ERR("non-matching keys\n");
+               TALLOC_FREE(rec);
+               return NULL;
+       }
+
+       if (rval == NULL) {
+               DBG_ERR("returning NULL value\n");      /* FIXME no error */
+       } else {
+               rec->value.dsize = rvallen;
+               rec->value.dptr = talloc_memdup(rec, rval, rvallen);
+               if (rec->value.dptr == NULL) {
+                       TALLOC_FREE(rec);
+                       return NULL;
+               }
+       }
+       rados_omap_get_end(iter);
+
+       return rec;
+}
+
+static struct db_record *db_ceph_fetch_locked(struct db_context *db,
+                                             TALLOC_CTX *mem_ctx,
+                                             TDB_DATA key)
+{
+       struct db_ceph_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ceph_ctx);
+
+       return db_ceph_fetch_locked_internal(ctx, mem_ctx, key, false);
+}
+
+static struct db_record *db_ceph_try_fetch_locked(struct db_context *db,
+                                                 TALLOC_CTX *mem_ctx,
+                                                 TDB_DATA key)
+{
+       struct db_ceph_ctx *ctx = talloc_get_type_abort(db->private_data,
+                                                       struct db_ceph_ctx);
+
+       return db_ceph_fetch_locked_internal(ctx, mem_ctx, key, true);
+}
+
 struct db_context *db_ceph_open(TALLOC_CTX *mem_ctx,
                                struct messaging_context *msg_ctx,
                                const char *name,
@@ -101,17 +313,27 @@ struct db_context *db_ceph_open(TALLOC_CTX *mem_ctx,
        int ret;
        struct db_context *db;
        struct db_ceph_ctx *ctx;
+       const char *rados_pool = lp_parm_const_string(-1, "ceph", "tdb_pool",
+                                                     NULL);
 
        if (lp_clustering() != CLUSTERING_CEPH) {
                DBG_ERR("Ceph Clustering Disabled\n");
                return NULL;
        }
 
+       if (rados_pool == NULL) {
+               DBG_ERR("Ceph tdb_pool not configured\n");
+               return NULL;
+       }
+
        db = talloc_zero(mem_ctx, struct db_context);
        if (db == NULL) {
                return NULL;
        }
 
+       db->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
+       db->lock_order = lock_order;
+
        ctx = talloc_zero(db, struct db_ceph_ctx);
        if (ctx == NULL) {
                TALLOC_FREE(db);
@@ -133,9 +355,19 @@ struct db_context *db_ceph_open(TALLOC_CTX *mem_ctx,
                TALLOC_FREE(db);
                return NULL;
        }
+       ret = rados_ioctx_create(ctx->cluster,
+                                rados_pool,
+                                &ctx->rados_ioctx);
+       if (ret < 0) {
+               rados_shutdown(ctx->cluster);
+               TALLOC_FREE(db);
+               return NULL;
+       }
        talloc_set_destructor(ctx, db_ceph_destructor);
 
        db->private_data = ctx;
+       db->fetch_locked = db_ceph_fetch_locked;
+       db->try_fetch_locked = db_ceph_try_fetch_locked;
 
        return db;
 }