messaging3: Fix messaging_read_send/recv
authorVolker Lendecke <vl@samba.org>
Tue, 29 Apr 2014 12:14:24 +0000 (14:14 +0200)
committerVolker Lendecke <vl@samba.org>
Wed, 30 Apr 2014 12:52:08 +0000 (14:52 +0200)
messaging_read_send/recv was okay for just one handler in the queue. For
multiple handlers it was pretty broken.

A handler that deletes itself as part of the callback (pretty typical use
case...) drops the message for a subsequent handler that responds to the same
message type. In messaging_dispatch_rec we walk the array, however
messaging_read_cleanup has already changed the array. tevent_req_defer_callback
does not help here: It only defers the callback, it does not defer the cleanup
function.

This also happens when a callback deletes a different handler

A handler that re-installs itself in the callback might get a message twice.

This patch changes the code such that only messaging_dispatch_rec adds records
to msg_ctx->waiters, new waiters are put into a staging area first
(msg_ctx->new_waiters). Also messaging_read_cleanup does not move anything
around in msg_ctx->waiters, it only nulls out itself. messaging_dispatch_rec is
changed to cope with this.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/messages.h
source3/lib/messages.c

index 1681ec9ede322d1c672b3b66695213c0633fa017..06c174833cd5ddb5933135027c61cf316f78c63e 100644 (file)
@@ -76,6 +76,9 @@ struct messaging_context {
        struct tevent_context *event_ctx;
        struct messaging_callback *callbacks;
 
+       struct tevent_req **new_waiters;
+       unsigned num_new_waiters;
+
        struct tevent_req **waiters;
        unsigned num_waiters;
 
index b0b2bb24fba775e1ef916ee6bd74e022c8cddd3b..9284ac132ab7132313c51f6f5e7d18328c9e8c4e 100644 (file)
@@ -475,7 +475,7 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct messaging_read_state *state;
-       size_t waiters_len;
+       size_t new_waiters_len;
 
        req = tevent_req_create(mem_ctx, &state,
                                struct messaging_read_state);
@@ -486,21 +486,21 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
        state->msg_ctx = msg_ctx;
        state->msg_type = msg_type;
 
-       waiters_len = talloc_array_length(msg_ctx->waiters);
+       new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
-       if (waiters_len == msg_ctx->num_waiters) {
+       if (new_waiters_len == msg_ctx->num_new_waiters) {
                struct tevent_req **tmp;
 
-               tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
-                                    struct tevent_req *, waiters_len+1);
+               tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
+                                    struct tevent_req *, new_waiters_len+1);
                if (tevent_req_nomem(tmp, req)) {
                        return tevent_req_post(req, ev);
                }
-               msg_ctx->waiters = tmp;
+               msg_ctx->new_waiters = tmp;
        }
 
-       msg_ctx->waiters[msg_ctx->num_waiters] = req;
-       msg_ctx->num_waiters += 1;
+       msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
+       msg_ctx->num_new_waiters += 1;
        tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
 
        return req;
@@ -512,15 +512,20 @@ static void messaging_read_cleanup(struct tevent_req *req,
        struct messaging_read_state *state = tevent_req_data(
                req, struct messaging_read_state);
        struct messaging_context *msg_ctx = state->msg_ctx;
-       struct tevent_req **waiters = msg_ctx->waiters;
        unsigned i;
 
        tevent_req_set_cleanup_fn(req, NULL);
 
        for (i=0; i<msg_ctx->num_waiters; i++) {
-               if (waiters[i] == req) {
-                       waiters[i] = waiters[msg_ctx->num_waiters-1];
-                       msg_ctx->num_waiters -= 1;
+               if (msg_ctx->waiters[i] == req) {
+                       msg_ctx->waiters[i] = NULL;
+                       return;
+               }
+       }
+
+       for (i=0; i<msg_ctx->num_new_waiters; i++) {
+               if (msg_ctx->new_waiters[i] == req) {
+                       msg_ctx->new_waiters[i] = NULL;
                        return;
                }
        }
@@ -556,6 +561,34 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
        return 0;
 }
 
+static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
+{
+       if (msg_ctx->num_new_waiters == 0) {
+               return true;
+       }
+
+       if (talloc_array_length(msg_ctx->waiters) <
+           (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
+               struct tevent_req **tmp;
+               tmp = talloc_realloc(
+                       msg_ctx, msg_ctx->waiters, struct tevent_req *,
+                       msg_ctx->num_waiters + msg_ctx->num_new_waiters);
+               if (tmp == NULL) {
+                       DEBUG(1, ("%s: talloc failed\n", __func__));
+                       return false;
+               }
+               msg_ctx->waiters = tmp;
+       }
+
+       memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
+              sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
+
+       msg_ctx->num_waiters += msg_ctx->num_new_waiters;
+       msg_ctx->num_new_waiters = 0;
+
+       return true;
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -578,14 +611,39 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                }
        }
 
-       for (i=0; i<msg_ctx->num_waiters; i++) {
-               struct tevent_req *req = msg_ctx->waiters[i];
-               struct messaging_read_state *state = tevent_req_data(
-                       req, struct messaging_read_state);
+       if (!messaging_append_new_waiters(msg_ctx)) {
+               return;
+       }
+
+       i = 0;
+       while (i < msg_ctx->num_waiters) {
+               struct tevent_req *req;
+               struct messaging_read_state *state;
+
+               req = msg_ctx->waiters[i];
+               if (req == NULL) {
+                       /*
+                        * This got cleaned up. In the meantime,
+                        * move everything down one. We need
+                        * to keep the order of waiters, as
+                        * other code may depend on this.
+                        */
+                       if (i <  msg_ctx->num_waiters - 1) {
+                               memmove(&msg_ctx->waiters[i],
+                                       &msg_ctx->waiters[i+1],
+                                       sizeof(struct tevent_req *) *
+                                               (msg_ctx->num_waiters - i - 1));
+                       }
+                       msg_ctx->num_waiters -= 1;
+                       continue;
+               }
 
+               state = tevent_req_data(req, struct messaging_read_state);
                if (state->msg_type == rec->msg_type) {
                        messaging_read_done(req, rec);
                }
+
+               i += 1;
        }
        return;
 }