messaging: Broadcast messages to all event contexts
authorVolker Lendecke <vl@samba.org>
Thu, 22 Jun 2017 06:54:56 +0000 (08:54 +0200)
committerRalph Boehme <slow@samba.org>
Tue, 25 Jul 2017 15:43:18 +0000 (17:43 +0200)
We must give all event contexts that might be interested the chance to pick up
the message. If we send a message to ourselves via messaging_send_iov_from,
nested event contexts need to get a chance to see the message. Before this
patch only the main event context in msg_ctx got it.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source3/lib/messages.c

index 61005abf3177cd7803f656fefc651e9498e3a849..5c10013df1d4633990fbca0b9405387dac656309 100644 (file)
@@ -70,6 +70,7 @@ struct messaging_callback {
 
 struct messaging_registered_ev {
        struct tevent_context *ev;
+       struct tevent_immediate *im;
        size_t refcount;
 };
 
@@ -78,6 +79,8 @@ struct messaging_context {
        struct tevent_context *event_ctx;
        struct messaging_callback *callbacks;
 
+       struct messaging_rec *posted_msgs;
+
        struct messaging_registered_ev *event_contexts;
 
        struct tevent_req **new_waiters;
@@ -94,6 +97,8 @@ struct messaging_context {
 
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
                                               struct messaging_rec *rec);
+static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
+                                      struct messaging_rec *rec);
 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
                                       struct tevent_context *ev,
                                       struct messaging_rec *rec);
@@ -230,6 +235,11 @@ static bool messaging_deregister_event_context(struct messaging_context *ctx,
                                 * paranoia
                                 */
                                reg->ev = NULL;
+
+                               /*
+                                * Do not talloc_free(reg->im),
+                                * recycle immediates events.
+                                */
                        }
                        return true;
                }
@@ -237,6 +247,105 @@ static bool messaging_deregister_event_context(struct messaging_context *ctx,
        return false;
 }
 
+static void messaging_post_main_event_context(struct tevent_context *ev,
+                                             struct tevent_immediate *im,
+                                             void *private_data)
+{
+       struct messaging_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+
+       while (ctx->posted_msgs != NULL) {
+               struct messaging_rec *rec = ctx->posted_msgs;
+               bool consumed;
+
+               DLIST_REMOVE(ctx->posted_msgs, rec);
+
+               consumed = messaging_dispatch_classic(ctx, rec);
+               if (!consumed) {
+                       consumed = messaging_dispatch_waiters(
+                               ctx, ctx->event_ctx, rec);
+               }
+
+               if (!consumed) {
+                       uint8_t i;
+
+                       for (i=0; i<rec->num_fds; i++) {
+                               close(rec->fds[i]);
+                       }
+               }
+
+               TALLOC_FREE(rec);
+       }
+}
+
+static void messaging_post_sub_event_context(struct tevent_context *ev,
+                                            struct tevent_immediate *im,
+                                            void *private_data)
+{
+       struct messaging_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+       struct messaging_rec *rec, *next;
+
+       for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
+               bool consumed;
+
+               next = rec->next;
+
+               consumed = messaging_dispatch_waiters(ctx, ev, rec);
+               if (consumed) {
+                       DLIST_REMOVE(ctx->posted_msgs, rec);
+                       TALLOC_FREE(rec);
+               }
+       }
+}
+
+static bool messaging_alert_event_contexts(struct messaging_context *ctx)
+{
+       size_t i, num_event_contexts;
+
+       num_event_contexts = talloc_array_length(ctx->event_contexts);
+
+       for (i=0; i<num_event_contexts; i++) {
+               struct messaging_registered_ev *reg = &ctx->event_contexts[i];
+
+               if (reg->refcount == 0) {
+                       continue;
+               }
+
+               if (reg->im == NULL) {
+                       reg->im = tevent_create_immediate(
+                               ctx->event_contexts);
+               }
+               if (reg->im == NULL) {
+                       DBG_WARNING("Could not create immediate\n");
+                       continue;
+               }
+
+               /*
+                * We depend on schedule_immediate to work
+                * multiple times. Might be a bit inefficient,
+                * but this needs to be proven in tests. The
+                * alternatively would be to track whether the
+                * immediate has already been scheduled. For
+                * now, avoid that complexity here.
+                */
+
+               if (reg->ev == ctx->event_ctx) {
+                       tevent_schedule_immediate(
+                               reg->im, reg->ev,
+                               messaging_post_main_event_context,
+                               ctx);
+               } else {
+                       tevent_schedule_immediate(
+                               reg->im, reg->ev,
+                               messaging_post_sub_event_context,
+                               ctx);
+               }
+
+       }
+       return true;
+}
+
 static void messaging_recv_cb(struct tevent_context *ev,
                              const uint8_t *msg, size_t msg_len,
                              int *fds, size_t num_fds,
@@ -308,6 +417,13 @@ static int messaging_context_destructor(struct messaging_context *ctx)
                }
        }
 
+       /*
+        * The immediates from messaging_alert_event_contexts
+        * reference "ctx". Don't let them outlive the
+        * messaging_context we're destroying here.
+        */
+       TALLOC_FREE(ctx->event_contexts);
+
        return 0;
 }
 
@@ -612,57 +728,30 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
        return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
-struct messaging_post_state {
-       struct messaging_context *msg_ctx;
-       struct messaging_rec *rec;
-};
-
-static void messaging_post_handler(struct tevent_context *ev,
-                                  struct tevent_immediate *ti,
-                                  void *private_data);
-
 static int messaging_post_self(struct messaging_context *msg_ctx,
                               struct server_id src, struct server_id dst,
                               uint32_t msg_type,
                               const struct iovec *iov, int iovlen,
                               const int *fds, size_t num_fds)
 {
-       struct tevent_immediate *ti;
-       struct messaging_post_state *state;
+       struct messaging_rec *rec;
+       bool ok;
 
-       state = talloc(msg_ctx, struct messaging_post_state);
-       if (state == NULL) {
+       rec = messaging_rec_create(
+               msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
+       if (rec == NULL) {
                return ENOMEM;
        }
-       state->msg_ctx = msg_ctx;
 
-       ti = tevent_create_immediate(state);
-       if (ti == NULL) {
-               goto fail;
-       }
-       state->rec = messaging_rec_create(
-               state, src, dst, msg_type, iov, iovlen, fds, num_fds);
-       if (state->rec == NULL) {
-               goto fail;
+       ok = messaging_alert_event_contexts(msg_ctx);
+       if (!ok) {
+               TALLOC_FREE(rec);
+               return ENOMEM;
        }
 
-       tevent_schedule_immediate(ti, msg_ctx->event_ctx,
-                                 messaging_post_handler, state);
-       return 0;
+       DLIST_ADD_END(msg_ctx->posted_msgs, rec);
 
-fail:
-       TALLOC_FREE(state);
-       return ENOMEM;
-}
-
-static void messaging_post_handler(struct tevent_context *ev,
-                                  struct tevent_immediate *ti,
-                                  void *private_data)
-{
-       struct messaging_post_state *state = talloc_get_type_abort(
-               private_data, struct messaging_post_state);
-       messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
-       TALLOC_FREE(state);
+       return 0;
 }
 
 int messaging_send_iov_from(struct messaging_context *msg_ctx,