messaging: Don't do self-sends in messaging_send_all
[nivanova/samba-autobuild/.git] / source3 / lib / messages.c
index 61005abf3177cd7803f656fefc651e9498e3a849..561616df6e44da9c989f90253cf2fc534a22422c 100644 (file)
 #include "lib/util/tevent_unix.h"
 #include "lib/background.h"
 #include "lib/messages_dgm.h"
-#include "lib/messages_ctdbd.h"
 #include "lib/util/iov_buf.h"
 #include "lib/util/server_id_db.h"
 #include "lib/messages_dgm_ref.h"
+#include "lib/messages_ctdb.h"
+#include "lib/messages_ctdb_ref.h"
 #include "lib/messages_util.h"
+#include "cluster_support.h"
+#include "ctdbd_conn.h"
+#include "ctdb_srvids.h"
+
+#ifdef CLUSTER_SUPPORT
+#include "ctdb_protocol.h"
+#endif
 
 struct messaging_callback {
        struct messaging_callback *prev, *next;
@@ -70,6 +78,7 @@ struct messaging_callback {
 
 struct messaging_registered_ev {
        struct tevent_context *ev;
+       struct tevent_immediate *im;
        size_t refcount;
 };
 
@@ -78,6 +87,8 @@ struct messaging_context {
        struct tevent_context *event_ctx;
        struct messaging_callback *callbacks;
 
+       struct messaging_rec *posted_msgs;
+
        struct messaging_registered_ev *event_contexts;
 
        struct tevent_req **new_waiters;
@@ -87,13 +98,15 @@ struct messaging_context {
        size_t num_waiters;
 
        void *msg_dgm_ref;
-       struct messaging_backend *remote;
+       void *msg_ctdb_ref;
 
        struct server_id_db *names_db;
 };
 
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
                                               struct messaging_rec *rec);
+static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
+                                      struct messaging_rec *rec);
 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
                                       struct tevent_context *ev,
                                       struct messaging_rec *rec);
@@ -230,6 +243,11 @@ static bool messaging_deregister_event_context(struct messaging_context *ctx,
                                 * paranoia
                                 */
                                reg->ev = NULL;
+
+                               /*
+                                * Do not talloc_free(reg->im),
+                                * recycle immediates events.
+                                */
                        }
                        return true;
                }
@@ -237,6 +255,105 @@ static bool messaging_deregister_event_context(struct messaging_context *ctx,
        return false;
 }
 
+static void messaging_post_main_event_context(struct tevent_context *ev,
+                                             struct tevent_immediate *im,
+                                             void *private_data)
+{
+       struct messaging_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+
+       while (ctx->posted_msgs != NULL) {
+               struct messaging_rec *rec = ctx->posted_msgs;
+               bool consumed;
+
+               DLIST_REMOVE(ctx->posted_msgs, rec);
+
+               consumed = messaging_dispatch_classic(ctx, rec);
+               if (!consumed) {
+                       consumed = messaging_dispatch_waiters(
+                               ctx, ctx->event_ctx, rec);
+               }
+
+               if (!consumed) {
+                       uint8_t i;
+
+                       for (i=0; i<rec->num_fds; i++) {
+                               close(rec->fds[i]);
+                       }
+               }
+
+               TALLOC_FREE(rec);
+       }
+}
+
+static void messaging_post_sub_event_context(struct tevent_context *ev,
+                                            struct tevent_immediate *im,
+                                            void *private_data)
+{
+       struct messaging_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+       struct messaging_rec *rec, *next;
+
+       for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
+               bool consumed;
+
+               next = rec->next;
+
+               consumed = messaging_dispatch_waiters(ctx, ev, rec);
+               if (consumed) {
+                       DLIST_REMOVE(ctx->posted_msgs, rec);
+                       TALLOC_FREE(rec);
+               }
+       }
+}
+
+static bool messaging_alert_event_contexts(struct messaging_context *ctx)
+{
+       size_t i, num_event_contexts;
+
+       num_event_contexts = talloc_array_length(ctx->event_contexts);
+
+       for (i=0; i<num_event_contexts; i++) {
+               struct messaging_registered_ev *reg = &ctx->event_contexts[i];
+
+               if (reg->refcount == 0) {
+                       continue;
+               }
+
+               if (reg->im == NULL) {
+                       reg->im = tevent_create_immediate(
+                               ctx->event_contexts);
+               }
+               if (reg->im == NULL) {
+                       DBG_WARNING("Could not create immediate\n");
+                       continue;
+               }
+
+               /*
+                * We depend on schedule_immediate to work
+                * multiple times. Might be a bit inefficient,
+                * but this needs to be proven in tests. The
+                * alternatively would be to track whether the
+                * immediate has already been scheduled. For
+                * now, avoid that complexity here.
+                */
+
+               if (reg->ev == ctx->event_ctx) {
+                       tevent_schedule_immediate(
+                               reg->im, reg->ev,
+                               messaging_post_main_event_context,
+                               ctx);
+               } else {
+                       tevent_schedule_immediate(
+                               reg->im, reg->ev,
+                               messaging_post_sub_event_context,
+                               ctx);
+               }
+
+       }
+       return true;
+}
+
 static void messaging_recv_cb(struct tevent_context *ev,
                              const uint8_t *msg, size_t msg_len,
                              int *fds, size_t num_fds,
@@ -308,6 +425,13 @@ static int messaging_context_destructor(struct messaging_context *ctx)
                }
        }
 
+       /*
+        * The immediates from messaging_alert_event_contexts
+        * reference "ctx". Don't let them outlive the
+        * messaging_context we're destroying here.
+        */
+       TALLOC_FREE(ctx->event_contexts);
+
        return 0;
 }
 
@@ -395,16 +519,21 @@ static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
        }
        talloc_set_destructor(ctx, messaging_context_destructor);
 
+#ifdef CLUSTER_SUPPORT
        if (lp_clustering()) {
-               ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
-
-               if (ret != 0) {
-                       DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
-                                 strerror(ret)));
+               ctx->msg_ctdb_ref = messaging_ctdb_ref(
+                       ctx, ctx->event_ctx,
+                       lp_ctdbd_socket(), lp_ctdb_timeout(),
+                       ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
+               if (ctx->msg_ctdb_ref == NULL) {
+                       DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
+                                  strerror(ret));
                        status = map_nt_error_from_unix(ret);
                        goto done;
                }
        }
+#endif
+
        ctx->id.vnn = get_my_vnn();
 
        ctx->names_db = server_id_db_init(ctx,
@@ -479,6 +608,7 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
        char *lck_path;
 
        TALLOC_FREE(msg_ctx->msg_dgm_ref);
+       TALLOC_FREE(msg_ctx->msg_ctdb_ref);
 
        msg_ctx->id = (struct server_id) {
                .pid = getpid(), .vnn = msg_ctx->id.vnn
@@ -500,12 +630,14 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
        }
 
        if (lp_clustering()) {
-               ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
-                                            msg_ctx->remote);
-
-               if (ret != 0) {
-                       DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
-                                 strerror(ret)));
+               msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
+                       msg_ctx, msg_ctx->event_ctx,
+                       lp_ctdbd_socket(), lp_ctdb_timeout(),
+                       msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
+                       &ret);
+               if (msg_ctx->msg_ctdb_ref == NULL) {
+                       DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
+                                  strerror(ret));
                        return map_nt_error_from_unix(ret);
                }
        }
@@ -612,57 +744,30 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
        return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
-struct messaging_post_state {
-       struct messaging_context *msg_ctx;
-       struct messaging_rec *rec;
-};
-
-static void messaging_post_handler(struct tevent_context *ev,
-                                  struct tevent_immediate *ti,
-                                  void *private_data);
-
 static int messaging_post_self(struct messaging_context *msg_ctx,
                               struct server_id src, struct server_id dst,
                               uint32_t msg_type,
                               const struct iovec *iov, int iovlen,
                               const int *fds, size_t num_fds)
 {
-       struct tevent_immediate *ti;
-       struct messaging_post_state *state;
+       struct messaging_rec *rec;
+       bool ok;
 
-       state = talloc(msg_ctx, struct messaging_post_state);
-       if (state == NULL) {
+       rec = messaging_rec_create(
+               msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
+       if (rec == NULL) {
                return ENOMEM;
        }
-       state->msg_ctx = msg_ctx;
 
-       ti = tevent_create_immediate(state);
-       if (ti == NULL) {
-               goto fail;
-       }
-       state->rec = messaging_rec_create(
-               state, src, dst, msg_type, iov, iovlen, fds, num_fds);
-       if (state->rec == NULL) {
-               goto fail;
+       ok = messaging_alert_event_contexts(msg_ctx);
+       if (!ok) {
+               TALLOC_FREE(rec);
+               return ENOMEM;
        }
 
-       tevent_schedule_immediate(ti, msg_ctx->event_ctx,
-                                 messaging_post_handler, state);
-       return 0;
+       DLIST_ADD_END(msg_ctx->posted_msgs, rec);
 
-fail:
-       TALLOC_FREE(state);
-       return ENOMEM;
-}
-
-static void messaging_post_handler(struct tevent_context *ev,
-                                  struct tevent_immediate *ti,
-                                  void *private_data)
-{
-       struct messaging_post_state *state = talloc_get_type_abort(
-               private_data, struct messaging_post_state);
-       messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
-       TALLOC_FREE(state);
+       return 0;
 }
 
 int messaging_send_iov_from(struct messaging_context *msg_ctx,
@@ -683,18 +788,6 @@ int messaging_send_iov_from(struct messaging_context *msg_ctx,
                return EINVAL;
        }
 
-       if (dst.vnn != msg_ctx->id.vnn) {
-               if (num_fds > 0) {
-                       return ENOSYS;
-               }
-
-               ret = msg_ctx->remote->send_fn(src, dst,
-                                              msg_type, iov, iovlen,
-                                              NULL, 0,
-                                              msg_ctx->remote);
-               return ret;
-       }
-
        if (server_id_equal(&dst, &msg_ctx->id)) {
                ret = messaging_post_self(msg_ctx, src, dst, msg_type,
                                          iov, iovlen, fds, num_fds);
@@ -705,6 +798,15 @@ int messaging_send_iov_from(struct messaging_context *msg_ctx,
        iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
        memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
 
+       if (dst.vnn != msg_ctx->id.vnn) {
+               if (num_fds > 0) {
+                       return ENOSYS;
+               }
+
+               ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
+               return ret;
+       }
+
        ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
 
        if (ret == EACCES) {
@@ -743,6 +845,76 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
        return NT_STATUS_OK;
 }
 
+struct send_all_state {
+       struct messaging_context *msg_ctx;
+       int msg_type;
+       const void *buf;
+       size_t len;
+};
+
+static int send_all_fn(pid_t pid, void *private_data)
+{
+       struct send_all_state *state = private_data;
+       NTSTATUS status;
+
+       if (pid == getpid()) {
+               DBG_DEBUG("Skip ourselves in messaging_send_all\n");
+               return 0;
+       }
+
+       status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
+                                   state->msg_type, state->buf, state->len);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
+                           (uintmax_t)pid, nt_errstr(status));
+       }
+
+       return 0;
+}
+
+void messaging_send_all(struct messaging_context *msg_ctx,
+                       int msg_type, const void *buf, size_t len)
+{
+       struct send_all_state state = {
+               .msg_ctx = msg_ctx, .msg_type = msg_type,
+               .buf = buf, .len = len
+       };
+       int ret;
+
+#ifdef CLUSTER_SUPPORT
+       if (lp_clustering()) {
+               struct ctdbd_connection *conn = messaging_ctdb_connection();
+               uint8_t msghdr[MESSAGE_HDR_LENGTH];
+               struct iovec iov[] = {
+                       { .iov_base = msghdr,
+                         .iov_len = sizeof(msghdr) },
+                       { .iov_base = discard_const_p(void, buf),
+                         .iov_len = len }
+               };
+
+               message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
+                               (struct server_id) {0});
+
+               ret = ctdbd_messaging_send_iov(
+                       conn, CTDB_BROADCAST_CONNECTED,
+                       CTDB_SRVID_SAMBA_PROCESS,
+                       iov, ARRAY_SIZE(iov));
+               if (ret != 0) {
+                       DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
+                                   strerror(ret));
+               }
+
+               return;
+       }
+#endif
+
+       ret = messaging_dgm_forall(send_all_fn, &state);
+       if (ret != 0) {
+               DBG_WARNING("messaging_dgm_forall failed: %s\n",
+                           strerror(ret));
+       }
+}
+
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
                                               struct messaging_rec *rec)
 {
@@ -780,6 +952,7 @@ struct messaging_filtered_read_state {
        struct tevent_context *ev;
        struct messaging_context *msg_ctx;
        struct messaging_dgm_fde *fde;
+       struct messaging_ctdb_fde *cluster_fde;
 
        bool (*filter)(struct messaging_rec *rec, void *private_data);
        void *private_data;
@@ -822,6 +995,14 @@ struct tevent_req *messaging_filtered_read_send(
                return tevent_req_post(req, ev);
        }
 
+       if (lp_clustering()) {
+               state->cluster_fde =
+                       messaging_ctdb_register_tevent_context(state, ev);
+               if (tevent_req_nomem(state->cluster_fde, req)) {
+                       return tevent_req_post(req, ev);
+               }
+       }
+
        /*
         * We add ourselves to the "new_waiters" array, not the "waiters"
         * array. If we are called from within messaging_read_done,
@@ -868,6 +1049,7 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
        tevent_req_set_cleanup_fn(req, NULL);
 
        TALLOC_FREE(state->fde);
+       TALLOC_FREE(state->cluster_fde);
 
        ok = messaging_deregister_event_context(msg_ctx, state->ev);
        if (!ok) {
@@ -1184,6 +1366,7 @@ bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
                            60*15),
                mess_parent_dgm_cleanup, msg);
        if (req == NULL) {
+               DBG_WARNING("background_job_send failed\n");
                return false;
        }
        tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);