dbwrap_ctdb: implement parse_record_send()/recv()
authorRalph Boehme <slow@samba.org>
Wed, 21 Dec 2016 07:38:25 +0000 (08:38 +0100)
committerJeremy Allison <jra@samba.org>
Tue, 18 Apr 2017 20:54:16 +0000 (22:54 +0200)
This mainly works like the sync version, but calls ctdbd_parse_send/recv
instead.

We use one global ctdb connection that is used exclusively for async
requests.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
source3/lib/ctdb_dummy.c
source3/lib/dbwrap/dbwrap_ctdb.c
source3/lib/dbwrap/dbwrap_ctdb.h
source3/lib/util.c

index 8b617bab288d768670814c1ac88948f91e4c9635..0b1acb7dcdeda95eee63da59a38c1b23eff9a4dd 100644 (file)
@@ -94,3 +94,8 @@ struct ctdbd_connection *messaging_ctdbd_connection(void)
 {
        return NULL;
 }
+
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+       return ENOSYS;
+}
index e0223eeaf4e6e2e9610c55e4d1ee9f16ed6fdf3c..87ac8e17535f009c3b041533e689435fe4eb4b7e 100644 (file)
@@ -35,6 +35,7 @@
 #include "g_lock.h"
 #include "messages.h"
 #include "lib/cluster_support.h"
+#include "lib/util/tevent_ntstatus.h"
 
 struct db_ctdb_transaction_handle {
        struct db_ctdb_ctx *ctx;
@@ -68,6 +69,59 @@ struct db_ctdb_rec {
        struct timeval lock_time;
 };
 
+struct ctdb_async_ctx {
+       bool initialized;
+       struct ctdbd_connection *async_conn;
+};
+
+static struct ctdb_async_ctx ctdb_async_ctx;
+
+static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       bool reinit)
+{
+       int ret;
+
+       if (reinit) {
+               TALLOC_FREE(ctdb_async_ctx.async_conn);
+               ctdb_async_ctx.initialized = false;
+       }
+
+       if (ctdb_async_ctx.initialized) {
+               return 0;
+       }
+
+       become_root();
+       ret = ctdbd_init_connection(mem_ctx,
+                                   lp_ctdbd_socket(),
+                                   lp_ctdb_timeout(),
+                                   &ctdb_async_ctx.async_conn);
+       unbecome_root();
+
+       if (ctdb_async_ctx.async_conn == NULL) {
+               DBG_ERR("ctdbd_init_connection failed\n");
+               return EIO;
+       }
+
+       ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
+       if (ret != 0) {
+               DBG_ERR("ctdbd_setup_ev failed\n");
+               return ret;
+       }
+
+       return 0;
+}
+
+static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+       return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
+}
+
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+       return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
+}
+
 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
 {
        enum TDB_ERROR tret = tdb_error(tdb);
@@ -1350,6 +1404,102 @@ static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
        return NT_STATUS_OK;
 }
 
+static void db_ctdb_parse_record_done(struct tevent_req *subreq);
+
+static struct tevent_req *db_ctdb_parse_record_send(
+       TALLOC_CTX *mem_ctx,
+       struct tevent_context *ev,
+       struct db_context *db,
+       TDB_DATA key,
+       void (*parser)(TDB_DATA key,
+                      TDB_DATA data,
+                      void *private_data),
+       void *private_data,
+       enum dbwrap_req_state *req_state)
+{
+       struct db_ctdb_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_ctdb_ctx);
+       struct tevent_req *req = NULL;
+       struct tevent_req *subreq = NULL;
+       struct db_ctdb_parse_record_state *state = NULL;
+       NTSTATUS status;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct db_ctdb_parse_record_state);
+       if (req == NULL) {
+               *req_state = DBWRAP_REQ_ERROR;
+               return NULL;
+
+       }
+
+       *state = (struct db_ctdb_parse_record_state) {
+               .parser = parser,
+               .private_data = private_data,
+               .my_vnn = ctdbd_vnn(ctx->conn),
+               .empty_record = false,
+       };
+
+       status = db_ctdb_try_parse_local_record(ctx, key, state);
+       if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
+               if (tevent_req_nterror(req, status)) {
+                       *req_state = DBWRAP_REQ_ERROR;
+                       return tevent_req_post(req, ev);
+               }
+               *req_state = DBWRAP_REQ_DONE;
+               tevent_req_done(req);
+               return tevent_req_post(req, ev);
+       }
+
+       subreq = ctdbd_parse_send(state,
+                                 ev,
+                                 ctdb_async_ctx.async_conn,
+                                 ctx->db_id,
+                                 key,
+                                 state->ask_for_readonly_copy,
+                                 parser,
+                                 private_data,
+                                 req_state);
+       if (tevent_req_nomem(subreq, req)) {
+               *req_state = DBWRAP_REQ_ERROR;
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
+
+       return req;
+}
+
+static void db_ctdb_parse_record_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret;
+
+       ret = ctdbd_parse_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
+               if (ret == ENOENT) {
+                       /*
+                        * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
+                        * upper layers expect NT_STATUS_NOT_FOUND for "no
+                        * record around". We need to convert dbwrap to 0/errno
+                        * away from NTSTATUS ... :-)
+                        */
+                       tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
+                       return;
+               }
+               tevent_req_nterror(req, map_nt_error_from_unix(ret));
+               return;
+       }
+
+       tevent_req_done(req);
+       return;
+}
+
+static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_ntstatus(req);
+}
+
 struct traverse_state {
        struct db_context *db;
        int (*fn)(struct db_record *rec, void *private_data);
@@ -1675,6 +1825,15 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
                TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
 
+       if (!result->persistent) {
+               ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
+               if (ret != 0) {
+                       DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
        if (!result->persistent &&
            (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
        {
@@ -1745,6 +1904,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        result->fetch_locked = db_ctdb_fetch_locked;
        result->try_fetch_locked = db_ctdb_try_fetch_locked;
        result->parse_record = db_ctdb_parse_record;
+       result->parse_record_send = db_ctdb_parse_record_send;
+       result->parse_record_recv = db_ctdb_parse_record_recv;
        result->traverse = db_ctdb_traverse;
        result->traverse_read = db_ctdb_traverse_read;
        result->get_seqnum = db_ctdb_get_seqnum;
index 3f047020abb0d25d478bf4dea2ebb66813f9e7da..42c831fbcf132bf84b8935dba5853d2c73a7d64e 100644 (file)
@@ -36,5 +36,6 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
                                int open_flags, mode_t mode,
                                enum dbwrap_lock_order lock_order,
                                uint64_t dbwrap_flags);
+int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev);
 
 #endif /* __DBWRAP_CTDB_H__ */
index d525be6d6bba454065d82626797505e9df31a631..fb508842678a35b440834311b948be55d226a24a 100644 (file)
@@ -35,6 +35,7 @@
 #include "lib/util/sys_rw.h"
 #include "lib/util/sys_rw_data.h"
 #include "lib/util/util_process.h"
+#include "lib/dbwrap/dbwrap_ctdb.h"
 
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
@@ -437,6 +438,7 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
                           const char *comment)
 {
        NTSTATUS status = NT_STATUS_OK;
+       int ret;
 
        if (reinit_after_fork_pipe[1] != -1) {
                close(reinit_after_fork_pipe[1]);
@@ -478,6 +480,16 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
                        DEBUG(0,("messaging_reinit() failed: %s\n",
                                 nt_errstr(status)));
                }
+
+               if (lp_clustering()) {
+                       ret = ctdb_async_ctx_reinit(
+                               NULL, messaging_tevent_context(msg_ctx));
+                       if (ret != 0) {
+                               DBG_ERR("db_ctdb_async_ctx_reinit failed: %s\n",
+                                       strerror(errno));
+                               return map_nt_error_from_unix(ret);
+                       }
+               }
        }
 
        if (comment) {