messages_ctdb: Handle async msgs for nested event contexts
authorVolker Lendecke <vl@samba.org>
Thu, 1 Jun 2017 16:58:16 +0000 (18:58 +0200)
committerRalph Boehme <slow@samba.org>
Tue, 25 Jul 2017 15:43:18 +0000 (17:43 +0200)
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source3/lib/ctdb_dummy.c
source3/lib/messages.c
source3/lib/messages_ctdbd.c
source3/lib/messages_ctdbd.h

index 4c0403c2d4707148ead38b39120f2b638ccc294e..b6ec2285ec7b02948241123920ec1bc2ce669eff 100644 (file)
@@ -81,6 +81,11 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
 
 int messaging_ctdbd_init(struct messaging_context *msg_ctx,
                         TALLOC_CTX *mem_ctx,
+                        void (*recv_cb)(struct tevent_context *ev,
+                                        const uint8_t *msg, size_t msg_len,
+                                        int *fds, size_t num_fds,
+                                        void *private_data),
+                        void *private_data,
                              struct messaging_backend **presult)
 {
        return ENOSYS;
@@ -88,6 +93,11 @@ int messaging_ctdbd_init(struct messaging_context *msg_ctx,
 
 int messaging_ctdbd_reinit(struct messaging_context *msg_ctx,
                           TALLOC_CTX *mem_ctx,
+                          void (*recv_cb)(struct tevent_context *ev,
+                                          const uint8_t *msg, size_t msg_len,
+                                          int *fds, size_t num_fds,
+                                          void *private_data),
+                          void *private_data,
                           struct messaging_backend *backend)
 {
        return ENOSYS;
index 759cc8bf3b641920b6ef816b81f6cfec7d21d88f..4e838b03843c1b1c1f5497f2220bb78587b6e027 100644 (file)
@@ -513,7 +513,8 @@ static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
        talloc_set_destructor(ctx, messaging_context_destructor);
 
        if (lp_clustering()) {
-               ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
+               ret = messaging_ctdbd_init(
+                       ctx, ctx, messaging_recv_cb, ctx, &ctx->remote);
 
                if (ret != 0) {
                        DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
@@ -627,8 +628,10 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
        if (lp_clustering()) {
                TALLOC_FREE(msg_ctx->cluster_fde);
 
-               ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
-                                            msg_ctx->remote);
+               ret = messaging_ctdbd_reinit(
+                       msg_ctx, msg_ctx, messaging_recv_cb, msg_ctx,
+                       msg_ctx->remote);
+
                if (ret != 0) {
                        DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
                                  strerror(ret)));
index 62b50826202495592994e3c1a231f222a6d424ca..b1af4f827439222af77494e06d6b166586c2aa75 100644 (file)
@@ -47,6 +47,12 @@ struct messaging_ctdbd_context {
        struct ctdbd_connection *conn;
 
        struct messaging_ctdbd_fde_ev *fde_evs;
+
+       void (*recv_cb)(struct tevent_context *ev,
+                       const uint8_t *msg, size_t msg_len,
+                       int *fds, size_t num_fds,
+                       void *private_data);
+       void *private_data;
 };
 
 /*
@@ -129,52 +135,10 @@ static int messaging_ctdb_recv(
        uint32_t src_vnn, uint32_t dst_vnn, uint64_t dst_srvid,
        const uint8_t *msg, size_t msg_len, void *private_data)
 {
-       struct messaging_context *msg_ctx = talloc_get_type_abort(
-               private_data, struct messaging_context);
-       struct server_id me = messaging_server_id(msg_ctx);
-       int ret;
-       struct iovec iov;
-       struct server_id src, dst;
-       enum messaging_type msg_type;
-       struct server_id_buf idbuf;
-
-       if (msg_len < MESSAGE_HDR_LENGTH) {
-               DEBUG(1, ("%s: message too short: %u\n", __func__,
-                         (unsigned)msg_len));
-               return 0;
-       }
-
-       message_hdr_get(&msg_type, &src, &dst, msg);
-
-       iov = (struct iovec) {
-               .iov_base = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
-               .iov_len = msg_len - MESSAGE_HDR_LENGTH
-       };
-
-       DEBUG(10, ("%s: Received message 0x%x len %u from %s\n",
-                  __func__, (unsigned)msg_type, (unsigned)msg_len,
-                  server_id_str_buf(src, &idbuf)));
-
-       if (!server_id_same_process(&me, &dst)) {
-               struct server_id_buf id1, id2;
-
-               DEBUG(10, ("%s: I'm %s, ignoring msg to %s\n", __func__,
-                          server_id_str_buf(me, &id1),
-                          server_id_str_buf(dst, &id2)));
-               return 0;
-       }
-
-       /*
-        * Go through the event loop
-        */
-
-       ret = messaging_send_iov_from(msg_ctx, src, dst, msg_type,
-                                     &iov, 1, NULL, 0);
+       struct messaging_ctdbd_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_ctdbd_context);
 
-       if (ret != 0) {
-               DEBUG(10, ("%s: messaging_send_iov_from failed: %s\n",
-                          __func__, strerror(ret)));
-       }
+       ctx->recv_cb(ev, msg, msg_len, NULL, 0, ctx->private_data);
 
        return 0;
 }
@@ -193,10 +157,15 @@ static void messaging_ctdbd_readable(struct tevent_context *ev,
        ctdbd_socket_readable(ev, conn);
 }
 
-static int messaging_ctdbd_init_internal(struct messaging_context *msg_ctx,
-                                        TALLOC_CTX *mem_ctx,
-                                        struct messaging_ctdbd_context *ctx,
-                                        bool reinit)
+static int messaging_ctdbd_init_internal(
+       struct messaging_context *msg_ctx, TALLOC_CTX *mem_ctx,
+       struct messaging_ctdbd_context *ctx,
+       void (*recv_cb)(struct tevent_context *ev,
+                       const uint8_t *msg, size_t msg_len,
+                       int *fds, size_t num_fds,
+                       void *private_data),
+       void *private_data,
+       bool reinit)
 {
        int ret;
 
@@ -229,8 +198,11 @@ static int messaging_ctdbd_init_internal(struct messaging_context *msg_ctx,
                return ret;
        }
 
+       ctx->recv_cb = recv_cb;
+       ctx->private_data = private_data;
+
        ret = register_with_ctdbd(ctx->conn, getpid(),
-                                 messaging_ctdb_recv, msg_ctx);
+                                 messaging_ctdb_recv, ctx);
        if (ret != 0) {
                DEBUG(10, ("register_with_ctdbd failed: %s\n",
                           strerror(ret)));
@@ -248,6 +220,11 @@ static int messaging_ctdbd_init_internal(struct messaging_context *msg_ctx,
 
 int messaging_ctdbd_init(struct messaging_context *msg_ctx,
                         TALLOC_CTX *mem_ctx,
+                        void (*recv_cb)(struct tevent_context *ev,
+                                        const uint8_t *msg, size_t msg_len,
+                                        int *fds, size_t num_fds,
+                                        void *private_data),
+                        void *private_data,
                         struct messaging_backend **presult)
 {
        struct messaging_backend *result;
@@ -265,7 +242,8 @@ int messaging_ctdbd_init(struct messaging_context *msg_ctx,
                return ENOMEM;
        }
 
-       ret = messaging_ctdbd_init_internal(msg_ctx, mem_ctx, ctx, false);
+       ret = messaging_ctdbd_init_internal(msg_ctx, mem_ctx, ctx,
+                                           recv_cb, private_data, false);
        if (ret != 0) {
                TALLOC_FREE(result);
                return ret;
@@ -280,13 +258,19 @@ int messaging_ctdbd_init(struct messaging_context *msg_ctx,
 
 int messaging_ctdbd_reinit(struct messaging_context *msg_ctx,
                           TALLOC_CTX *mem_ctx,
+                          void (*recv_cb)(struct tevent_context *ev,
+                                          const uint8_t *msg, size_t msg_len,
+                                          int *fds, size_t num_fds,
+                                          void *private_data),
+                          void *private_data,
                           struct messaging_backend *backend)
 {
        struct messaging_ctdbd_context *ctx = talloc_get_type_abort(
                backend->private_data, struct messaging_ctdbd_context);
        int ret;
 
-       ret = messaging_ctdbd_init_internal(msg_ctx, mem_ctx, ctx, true);
+       ret = messaging_ctdbd_init_internal(msg_ctx, mem_ctx, ctx,
+                                           recv_cb, private_data, true);
        if (ret != 0) {
                return ret;
        }
index c13079d84d1ca68bbdeae102efb004ec61ce09ac..7d928fe1b45f7f0d3813bc9afe8797c767887fb5 100644 (file)
@@ -29,9 +29,19 @@ struct ctdbd_connection;
 
 int messaging_ctdbd_init(struct messaging_context *msg_ctx,
                         TALLOC_CTX *mem_ctx,
+                        void (*recv_cb)(struct tevent_context *ev,
+                                        const uint8_t *msg, size_t msg_len,
+                                        int *fds, size_t num_fds,
+                                        void *private_data),
+                        void *private_data,
                         struct messaging_backend **presult);
 int messaging_ctdbd_reinit(struct messaging_context *msg_ctx,
                           TALLOC_CTX *mem_ctx,
+                          void (*recv_cb)(struct tevent_context *ev,
+                                          const uint8_t *msg, size_t msg_len,
+                                          int *fds, size_t num_fds,
+                                          void *private_data),
+                          void *private_data,
                           struct messaging_backend *backend);
 struct ctdbd_connection *messaging_ctdbd_connection(void);