struct messaging_registered_ev {
struct tevent_context *ev;
+ struct tevent_immediate *im;
size_t refcount;
};
struct tevent_context *event_ctx;
struct messaging_callback *callbacks;
+ struct messaging_rec *posted_msgs;
+
struct messaging_registered_ev *event_contexts;
struct tevent_req **new_waiters;
static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
struct messaging_rec *rec);
+static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
+ struct messaging_rec *rec);
static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
struct tevent_context *ev,
struct messaging_rec *rec);
* paranoia
*/
reg->ev = NULL;
+
+ /*
+ * Do not talloc_free(reg->im),
+ * recycle immediates events.
+ */
}
return true;
}
return false;
}
+static void messaging_post_main_event_context(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct messaging_context *ctx = talloc_get_type_abort(
+ private_data, struct messaging_context);
+
+ while (ctx->posted_msgs != NULL) {
+ struct messaging_rec *rec = ctx->posted_msgs;
+ bool consumed;
+
+ DLIST_REMOVE(ctx->posted_msgs, rec);
+
+ consumed = messaging_dispatch_classic(ctx, rec);
+ if (!consumed) {
+ consumed = messaging_dispatch_waiters(
+ ctx, ctx->event_ctx, rec);
+ }
+
+ if (!consumed) {
+ uint8_t i;
+
+ for (i=0; i<rec->num_fds; i++) {
+ close(rec->fds[i]);
+ }
+ }
+
+ TALLOC_FREE(rec);
+ }
+}
+
+static void messaging_post_sub_event_context(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct messaging_context *ctx = talloc_get_type_abort(
+ private_data, struct messaging_context);
+ struct messaging_rec *rec, *next;
+
+ for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
+ bool consumed;
+
+ next = rec->next;
+
+ consumed = messaging_dispatch_waiters(ctx, ev, rec);
+ if (consumed) {
+ DLIST_REMOVE(ctx->posted_msgs, rec);
+ TALLOC_FREE(rec);
+ }
+ }
+}
+
+static bool messaging_alert_event_contexts(struct messaging_context *ctx)
+{
+ size_t i, num_event_contexts;
+
+ num_event_contexts = talloc_array_length(ctx->event_contexts);
+
+ for (i=0; i<num_event_contexts; i++) {
+ struct messaging_registered_ev *reg = &ctx->event_contexts[i];
+
+ if (reg->refcount == 0) {
+ continue;
+ }
+
+ if (reg->im == NULL) {
+ reg->im = tevent_create_immediate(
+ ctx->event_contexts);
+ }
+ if (reg->im == NULL) {
+ DBG_WARNING("Could not create immediate\n");
+ continue;
+ }
+
+ /*
+ * We depend on schedule_immediate to work
+ * multiple times. Might be a bit inefficient,
+ * but this needs to be proven in tests. The
+ * alternatively would be to track whether the
+ * immediate has already been scheduled. For
+ * now, avoid that complexity here.
+ */
+
+ if (reg->ev == ctx->event_ctx) {
+ tevent_schedule_immediate(
+ reg->im, reg->ev,
+ messaging_post_main_event_context,
+ ctx);
+ } else {
+ tevent_schedule_immediate(
+ reg->im, reg->ev,
+ messaging_post_sub_event_context,
+ ctx);
+ }
+
+ }
+ return true;
+}
+
static void messaging_recv_cb(struct tevent_context *ev,
const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds,
}
}
+ /*
+ * The immediates from messaging_alert_event_contexts
+ * reference "ctx". Don't let them outlive the
+ * messaging_context we're destroying here.
+ */
+ TALLOC_FREE(ctx->event_contexts);
+
return 0;
}
return messaging_send(msg_ctx, server, msg_type, &blob);
}
-struct messaging_post_state {
- struct messaging_context *msg_ctx;
- struct messaging_rec *rec;
-};
-
-static void messaging_post_handler(struct tevent_context *ev,
- struct tevent_immediate *ti,
- void *private_data);
-
static int messaging_post_self(struct messaging_context *msg_ctx,
struct server_id src, struct server_id dst,
uint32_t msg_type,
const struct iovec *iov, int iovlen,
const int *fds, size_t num_fds)
{
- struct tevent_immediate *ti;
- struct messaging_post_state *state;
+ struct messaging_rec *rec;
+ bool ok;
- state = talloc(msg_ctx, struct messaging_post_state);
- if (state == NULL) {
+ rec = messaging_rec_create(
+ msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
+ if (rec == NULL) {
return ENOMEM;
}
- state->msg_ctx = msg_ctx;
- ti = tevent_create_immediate(state);
- if (ti == NULL) {
- goto fail;
- }
- state->rec = messaging_rec_create(
- state, src, dst, msg_type, iov, iovlen, fds, num_fds);
- if (state->rec == NULL) {
- goto fail;
+ ok = messaging_alert_event_contexts(msg_ctx);
+ if (!ok) {
+ TALLOC_FREE(rec);
+ return ENOMEM;
}
- tevent_schedule_immediate(ti, msg_ctx->event_ctx,
- messaging_post_handler, state);
- return 0;
+ DLIST_ADD_END(msg_ctx->posted_msgs, rec);
-fail:
- TALLOC_FREE(state);
- return ENOMEM;
-}
-
-static void messaging_post_handler(struct tevent_context *ev,
- struct tevent_immediate *ti,
- void *private_data)
-{
- struct messaging_post_state *state = talloc_get_type_abort(
- private_data, struct messaging_post_state);
- messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
- TALLOC_FREE(state);
+ return 0;
}
int messaging_send_iov_from(struct messaging_context *msg_ctx,