*/
#include "includes.h"
+#include "lib/util/base64.h"
#include "dbwrap/dbwrap.h"
#include "dbwrap/dbwrap_private.h"
#include "dbwrap/dbwrap_ceph.h"
struct tdb_wrap *wtdb;
uint32_t db_id;
rados_t cluster;
+ rados_ioctx_t rados_ioctx;
/* TODO transaction state */
/* TODO struct g_lock_ctx *lock_ctx; */
};
+struct db_ceph_rec {
+ struct db_ceph_ctx *ctx;
+ char *lockname; /* b64 encoded key */
+ bool locked;
+};
+
/* XXX copied from rados_svc_register() */
static int db_ceph_cluster_init(rados_t *_cluster)
{
static int db_ceph_destructor(struct db_ceph_ctx *ctx)
{
DBG_DEBUG("shutting down db Ceph cluster connection\n");
+ rados_ioctx_destroy(ctx->rados_ioctx);
rados_shutdown(ctx->cluster);
return 0;
}
+#define DB_CEPH_LOCK_COOKIE "db_ceph_cookie"
+#define DB_CEPH_LOCK_DESC ""
+
+static int db_ceph_record_destructor(struct db_record *rec)
+{
+ struct db_ceph_rec *ceph_rec = rec->private_data;
+ struct db_ceph_ctx *ctx = rec->db->private_data;
+
+ if (ceph_rec->locked) {
+ int ret;
+ DBG_INFO("unlocking %s->%s\n",
+ rec->db->name, ceph_rec->lockname);
+ ret = rados_unlock(ctx->rados_ioctx, rec->db->name,
+ ceph_rec->lockname, DB_CEPH_LOCK_COOKIE);
+ if (ret < 0) {
+ DBG_ERR("unlock %s->%s failed\n",
+ rec->db->name, ceph_rec->lockname);
+ return ret; /* FIXME: check callers */
+ }
+ }
+
+ return 0;
+}
+
+static struct db_record *db_ceph_fetch_locked_internal(struct db_ceph_ctx *ctx,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key,
+ bool try)
+{
+ int ret;
+ struct db_record *rec;
+ struct db_ceph_rec *ceph_rec;
+ DATA_BLOB key_blob = {
+ .data = key.dptr,
+ .length = key.dsize,
+ };
+ rados_read_op_t read_op;
+ rados_omap_iter_t iter;
+ const char *keys;
+ int op_ret;
+ char *rkey;
+ size_t rkeylen;
+ char *rval;
+ size_t rvallen;
+
+ rec = talloc_zero(mem_ctx, struct db_record);
+ if (rec == NULL) {
+ return NULL;
+ }
+ talloc_set_destructor(rec, db_ceph_record_destructor);
+
+ ceph_rec = talloc_zero(rec, struct db_ceph_rec);
+ if (ceph_rec == NULL) {
+ talloc_free(rec);
+ return NULL;
+ }
+
+ rec->db = ctx->db;
+ rec->storev = NULL; /* TODO */
+ rec->delete_rec = NULL; /* TODO */
+ rec->private_data = ceph_rec;
+ ceph_rec->ctx = ctx;
+
+ rec->key.dsize = key.dsize;
+ rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+ if (rec->key.dptr == NULL) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ /* XXX rec->key set later via get_vals_by_keys2 response iterator */
+
+ /*
+ * rados requires string for lockname, so encode as b64 for now.
+ * TODO use hash buckets for locks instead?
+ */
+ ceph_rec->lockname = base64_encode_data_blob(ceph_rec, key_blob);
+ if (ceph_rec->lockname == NULL) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+again:
+ DBG_INFO("locking %s->%s\n", rec->db->name, ceph_rec->lockname);
+ ret = rados_lock_exclusive(ctx->rados_ioctx, rec->db->name,
+ ceph_rec->lockname,
+ DB_CEPH_LOCK_COOKIE,
+ DB_CEPH_LOCK_DESC,
+ NULL, /* infinite duration */
+ 0);
+ if ((ret == -EEXIST) || (ret == -EBUSY)) {
+ /* lock contention */
+ DBG_WARNING("'%s' lock contention on RADOS object '%s'\n",
+ ceph_rec->lockname, rec->db->name);
+ if (try) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+ /*
+ * FIXME this should be event based!
+ * TODO spawn watch / notify thread on open?
+ */
+ usleep(100 * 1000);
+ goto again;
+ } else if (ret < 0) {
+ /* unexpected failure */
+ DBG_ERR("failed to get '%s' lock on RADOS object '%s' - (%s)\n",
+ ceph_rec->lockname, rec->db->name, strerror(-ret));
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ ceph_rec->locked = true;
+ DBG_DEBUG("'%s' lock obtained on RADOS object '%s'\n",
+ ceph_rec->lockname, rec->db->name);
+
+ /* TODO combine lock and fetch as a compound OSD operation */
+
+ read_op = rados_create_read_op();
+ if (read_op == NULL) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ op_ret = 0;
+ keys = (char *)key.dptr;
+ rados_read_op_omap_get_vals_by_keys2(read_op,
+ &keys,
+ 1, /* num_keys */
+ &key.dsize,
+ &iter,
+ &op_ret);
+
+ ret = rados_read_op_operate(read_op,
+ ctx->rados_ioctx,
+ rec->db->name,
+ LIBRADOS_OPERATION_NOFLAG);
+ rados_release_read_op(read_op);
+ if (ret < 0) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ if (op_ret < 0) {
+ /* FIXME: could also get ENOENT here if key is missing? */
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ ret = rados_omap_get_next2(iter, &rkey, &rval, &rkeylen, &rvallen);
+ if (ret < 0) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ if ((rkey == NULL) ||
+ (key.dsize != rkeylen)) {
+ DBG_ERR("Null or non-matching key sizes\n");
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ if (memcmp(key.dptr, rkey, key.dsize) != 0) {
+ DBG_ERR("non-matching keys\n");
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+
+ if (rval == NULL) {
+ DBG_ERR("returning NULL value\n"); /* FIXME no error */
+ } else {
+ rec->value.dsize = rvallen;
+ rec->value.dptr = talloc_memdup(rec, rval, rvallen);
+ if (rec->value.dptr == NULL) {
+ TALLOC_FREE(rec);
+ return NULL;
+ }
+ }
+ rados_omap_get_end(iter);
+
+ return rec;
+}
+
+static struct db_record *db_ceph_fetch_locked(struct db_context *db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key)
+{
+ struct db_ceph_ctx *ctx = talloc_get_type_abort(db->private_data,
+ struct db_ceph_ctx);
+
+ return db_ceph_fetch_locked_internal(ctx, mem_ctx, key, false);
+}
+
+static struct db_record *db_ceph_try_fetch_locked(struct db_context *db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key)
+{
+ struct db_ceph_ctx *ctx = talloc_get_type_abort(db->private_data,
+ struct db_ceph_ctx);
+
+ return db_ceph_fetch_locked_internal(ctx, mem_ctx, key, true);
+}
+
struct db_context *db_ceph_open(TALLOC_CTX *mem_ctx,
struct messaging_context *msg_ctx,
const char *name,
int ret;
struct db_context *db;
struct db_ceph_ctx *ctx;
+ const char *rados_pool = lp_parm_const_string(-1, "ceph", "tdb_pool",
+ NULL);
if (lp_clustering() != CLUSTERING_CEPH) {
DBG_ERR("Ceph Clustering Disabled\n");
return NULL;
}
+ if (rados_pool == NULL) {
+ DBG_ERR("Ceph tdb_pool not configured\n");
+ return NULL;
+ }
+
db = talloc_zero(mem_ctx, struct db_context);
if (db == NULL) {
return NULL;
}
+ db->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
+ db->lock_order = lock_order;
+
ctx = talloc_zero(db, struct db_ceph_ctx);
if (ctx == NULL) {
TALLOC_FREE(db);
TALLOC_FREE(db);
return NULL;
}
+ ret = rados_ioctx_create(ctx->cluster,
+ rados_pool,
+ &ctx->rados_ioctx);
+ if (ret < 0) {
+ rados_shutdown(ctx->cluster);
+ TALLOC_FREE(db);
+ return NULL;
+ }
talloc_set_destructor(ctx, db_ceph_destructor);
db->private_data = ctx;
+ db->fetch_locked = db_ceph_fetch_locked;
+ db->try_fetch_locked = db_ceph_try_fetch_locked;
return db;
}