s4:messaging: add imessaging_init_discard_incoming()
authorStefan Metzmacher <metze@samba.org>
Wed, 28 Sep 2022 11:47:13 +0000 (13:47 +0200)
committerRalph Boehme <slow@samba.org>
Thu, 13 Oct 2022 12:30:37 +0000 (12:30 +0000)
We often create imessaging contexts just for sending messages,
but we'll never process incoming messages because a temporary event
context was used and we just queue a lot of imessaging_post_state
structures with immediate events.

With imessaging_init_discard_incoming() we'll discard any incoming messages
unless we have pending irpc requests.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15201

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source4/lib/messaging/messaging.c
source4/lib/messaging/messaging.h
source4/lib/messaging/messaging_internal.h

index a00c35be0d58c4ed358ae13b4916470ef92f6382..99107c8017692131cc34950e485e0d79ad0ed6b1 100644 (file)
@@ -429,6 +429,12 @@ static NTSTATUS imessaging_reinit(struct imessaging_context *msg)
 
        TALLOC_FREE(msg->msg_dgm_ref);
 
+       if (msg->discard_incoming) {
+               msg->num_incoming_listeners = 0;
+       } else {
+               msg->num_incoming_listeners = 1;
+       }
+
        msg->server_id.pid = getpid();
 
        msg->msg_dgm_ref = messaging_dgm_ref(msg,
@@ -469,7 +475,9 @@ NTSTATUS imessaging_reinit_all(void)
 /*
   create the listening socket and setup the dispatcher
 */
-struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+static struct imessaging_context *imessaging_init_internal(
+                                          TALLOC_CTX *mem_ctx,
+                                          bool discard_incoming,
                                           struct loadparm_context *lp_ctx,
                                           struct server_id server_id,
                                           struct tevent_context *ev)
@@ -490,6 +498,12 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
                return NULL;
        }
        msg->ev = ev;
+       msg->discard_incoming = discard_incoming;
+       if (msg->discard_incoming) {
+               msg->num_incoming_listeners = 0;
+       } else {
+               msg->num_incoming_listeners = 1;
+       }
 
        talloc_set_destructor(msg, imessaging_context_destructor);
 
@@ -601,6 +615,36 @@ fail:
        return NULL;
 }
 
+/*
+  create the listening socket and setup the dispatcher
+*/
+struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+                                          struct loadparm_context *lp_ctx,
+                                          struct server_id server_id,
+                                          struct tevent_context *ev)
+{
+       bool discard_incoming = false;
+       return imessaging_init_internal(mem_ctx,
+                                       discard_incoming,
+                                       lp_ctx,
+                                       server_id,
+                                       ev);
+}
+
+struct imessaging_context *imessaging_init_discard_incoming(
+                                               TALLOC_CTX *mem_ctx,
+                                               struct loadparm_context *lp_ctx,
+                                               struct server_id server_id,
+                                               struct tevent_context *ev)
+{
+       bool discard_incoming = true;
+       return imessaging_init_internal(mem_ctx,
+                                       discard_incoming,
+                                       lp_ctx,
+                                       server_id,
+                                       ev);
+}
+
 struct imessaging_post_state {
        struct imessaging_context *msg_ctx;
        struct imessaging_post_state **busy_ref;
@@ -697,6 +741,22 @@ static void imessaging_dgm_recv(struct tevent_context *ev,
                return;
        }
 
+       if (msg->num_incoming_listeners == 0) {
+               struct server_id_buf selfbuf;
+
+               message_hdr_get(&msg_type, &src, &dst, buf);
+
+               DBG_DEBUG("not listening - discarding message from "
+                         "src[%s] to dst[%s] (self[%s]) type=0x%x "
+                         "on %s event context\n",
+                          server_id_str_buf(src, &srcbuf),
+                          server_id_str_buf(dst, &dstbuf),
+                          server_id_str_buf(msg->server_id, &selfbuf),
+                          (unsigned)msg_type,
+                          (ev != msg->ev) ? "different" : "main");
+               return;
+       }
+
        if (ev != msg->ev) {
                int ret;
                ret = imessaging_post_self(msg, buf, buf_len);
@@ -760,6 +820,7 @@ struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
 
        return imessaging_init(mem_ctx, lp_ctx, id, ev);
 }
+
 /*
   a list of registered irpc server functions
 */
@@ -975,6 +1036,12 @@ static int irpc_destructor(struct irpc_request *irpc)
 {
        if (irpc->callid != -1) {
                idr_remove(irpc->msg_ctx->idr, irpc->callid);
+               if (irpc->msg_ctx->discard_incoming) {
+                       SMB_ASSERT(irpc->msg_ctx->num_incoming_listeners > 0);
+               } else {
+                       SMB_ASSERT(irpc->msg_ctx->num_incoming_listeners > 1);
+               }
+               irpc->msg_ctx->num_incoming_listeners -= 1;
                irpc->callid = -1;
        }
 
@@ -1168,6 +1235,9 @@ static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx,
        state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler;
        state->irpc->incoming.private_data = req;
 
+       /* make sure we accept incoming messages */
+       SMB_ASSERT(state->irpc->msg_ctx->num_incoming_listeners < UINT64_MAX);
+       state->irpc->msg_ctx->num_incoming_listeners += 1;
        talloc_set_destructor(state->irpc, irpc_destructor);
 
        /* setup the header */
index 3fd788d1e4209e335792c2721bbb4c3a75818155..e7ae9e8cc463c3dac926f37241ffdb6de83ba92d 100644 (file)
@@ -49,6 +49,11 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
                                           struct loadparm_context *lp_ctx,
                                           struct server_id server_id,
                                           struct tevent_context *ev);
+struct imessaging_context *imessaging_init_discard_incoming(
+                                               TALLOC_CTX *mem_ctx,
+                                               struct loadparm_context *lp_ctx,
+                                               struct server_id server_id,
+                                               struct tevent_context *ev);
 void imessaging_dgm_unref_ev(struct tevent_context *ev);
 NTSTATUS imessaging_reinit_all(void);
 int imessaging_cleanup(struct imessaging_context *msg);
index 5e99734ad60e4d4cdce232b3eae917b48a9455a3..ac254c226316a181c84d7a4a462b18ad51f23ce8 100644 (file)
@@ -33,6 +33,15 @@ struct imessaging_context {
        struct server_id_db *names;
        struct timeval start_time;
        void *msg_dgm_ref;
+       /*
+        * The number of instances waiting for incoming
+        * messages. By default it's always greater than 0.
+        *
+        * If it's 0 we'll discard incoming messages,
+        * see imessaging_init_discard_imcoming().
+        */
+       bool discard_incoming;
+       uint64_t num_incoming_listeners;
 };
 
 NTSTATUS imessaging_register_extra_handlers(struct imessaging_context *msg);